Skip to content

Commit 7e2cbb9

Browse files
committed
upload multipart in parallel
1 parent 8699ce8 commit 7e2cbb9

File tree

2 files changed

+25
-32
lines changed

2 files changed

+25
-32
lines changed

src/storage/object_storage.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,11 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
844844
let stream_relative_path = format!("{stream_name}/{file_suffix}");
845845

846846
// Try uploading the file, handle potential errors without breaking the loop
847-
if let Err(e) = self.upload_file(&stream_relative_path, &path).await {
847+
// if let Err(e) = self.upload_multipart(key, path)
848+
if let Err(e) = self
849+
.upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path)
850+
.await
851+
{
848852
error!("Failed to upload file {filename:?}: {e}");
849853
continue; // Skip to the next file
850854
}

src/storage/s3.rs

+20-31
Original file line numberDiff line numberDiff line change
@@ -521,53 +521,42 @@ impl S3 {
521521

522522
let mut async_writer = self.client.put_multipart(location).await?;
523523

524-
// /* `abort_multipart()` has been removed */
525-
// let close_multipart = |err| async move {
526-
// error!("multipart upload failed. {:?}", err);
527-
// self.client
528-
// .abort_multipart(&key.into(), &multipart_id)
529-
// .await
530-
// };
531-
532524
let meta = file.metadata().await?;
533525
let total_size = meta.len() as usize;
534526
if total_size < MIN_MULTIPART_UPLOAD_SIZE {
535527
let mut data = Vec::new();
536528
file.read_to_end(&mut data).await?;
537529
self.client.put(location, data.into()).await?;
538-
// async_writer.put_part(data.into()).await?;
539-
// async_writer.complete().await?;
530+
540531
return Ok(());
541532
} else {
542-
let mut data = Vec::new();
543-
file.read_to_end(&mut data).await?;
544-
545-
// let mut upload_parts = Vec::new();
533+
let mut buf = [0; MIN_MULTIPART_UPLOAD_SIZE];
546534

547-
let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
548-
let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
549-
let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
535+
let mut tasks = FuturesUnordered::new();
550536

551-
// Upload each part
552-
for part_number in 0..(total_parts) {
553-
let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
554-
let end_pos = if part_number == num_full_parts && has_final_partial_part {
555-
// Last part might be smaller than 5MB (which is allowed)
556-
total_size
557-
} else {
558-
// All other parts must be at least 5MB
559-
start_pos + MIN_MULTIPART_UPLOAD_SIZE
560-
};
537+
while let Ok(b) = file.read(&mut buf).await {
538+
if b == 0 {
539+
break;
540+
}
561541

562542
// Extract this part's data
563-
let part_data = data[start_pos..end_pos].to_vec();
543+
let part_data = buf.to_vec();
564544

565545
// Upload the part
566-
async_writer.put_part(part_data.into()).await?;
546+
tasks.push(tokio::spawn(async_writer.put_part(part_data.into())));
547+
}
567548

568-
// upload_parts.push(part_number as u64 + 1);
549+
while let Some(res) = tasks.next().await {
550+
res??;
569551
}
570-
async_writer.complete().await?;
552+
553+
match async_writer.complete().await {
554+
Ok(_) => {}
555+
Err(e) => {
556+
error!("Failed to complete multipart upload: {:?}", e);
557+
async_writer.abort().await?;
558+
}
559+
};
571560
}
572561
Ok(())
573562
}

0 commit comments

Comments
 (0)