Skip to content

Commit cd4f854

Browse files
committed
upload multipart in parallel
1 parent 8699ce8 commit cd4f854

File tree

2 files changed

+19
-23
lines changed

2 files changed

+19
-23
lines changed

src/storage/object_storage.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,11 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
844844
let stream_relative_path = format!("{stream_name}/{file_suffix}");
845845

846846
// Try uploading the file, handle potential errors without breaking the loop
847-
if let Err(e) = self.upload_file(&stream_relative_path, &path).await {
847+
// if let Err(e) = self.upload_multipart(key, path)
848+
if let Err(e) = self
849+
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
850+
.await
851+
{
848852
error!("Failed to upload file {filename:?}: {e}");
849853
continue; // Skip to the next file
850854
}

src/storage/s3.rs

+14-22
Original file line numberDiff line numberDiff line change
@@ -535,38 +535,30 @@ impl S3 {
535535
let mut data = Vec::new();
536536
file.read_to_end(&mut data).await?;
537537
self.client.put(location, data.into()).await?;
538-
// async_writer.put_part(data.into()).await?;
539-
// async_writer.complete().await?;
538+
540539
return Ok(());
541540
} else {
542-
let mut data = Vec::new();
543-
file.read_to_end(&mut data).await?;
541+
let mut buf = [0; MIN_MULTIPART_UPLOAD_SIZE];
544542

545-
// let mut upload_parts = Vec::new();
543+
let mut tasks = FuturesUnordered::new();
544+
// let mut tasks = Vec::new();
546545

547-
let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
548-
let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
549-
let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
550-
551-
// Upload each part
552-
for part_number in 0..(total_parts) {
553-
let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
554-
let end_pos = if part_number == num_full_parts && has_final_partial_part {
555-
// Last part might be smaller than 5MB (which is allowed)
556-
total_size
557-
} else {
558-
// All other parts must be at least 5MB
559-
start_pos + MIN_MULTIPART_UPLOAD_SIZE
560-
};
546+
while let Ok(b) = file.read(&mut buf).await {
547+
if b == 0 {
548+
break;
549+
}
561550

562551
// Extract this part's data
563-
let part_data = data[start_pos..end_pos].to_vec();
552+
let part_data = buf.to_vec();
564553

565554
// Upload the part
566-
async_writer.put_part(part_data.into()).await?;
555+
tasks.push(tokio::spawn(async_writer.put_part(part_data.into())));
556+
}
567557

568-
// upload_parts.push(part_number as u64 + 1);
558+
while let Some(res) = tasks.next().await {
559+
res??;
569560
}
561+
570562
async_writer.complete().await?;
571563
}
572564
Ok(())

0 commit comments

Comments
 (0)