Skip to content

Commit dca33b4

Browse files
authored
Make createMPU Call Async (#84)
1 parent d19ea3e commit dca33b4

File tree

11 files changed

+265
-327
lines changed

11 files changed

+265
-327
lines changed

.github/workflows/ci.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
- docs
5656
- minrust
5757
steps:
58-
- run: exit 0
58+
- run: exit 0
5959

6060
test-hll:
6161
name: Test S3 transfer manager HLL
@@ -133,7 +133,7 @@ jobs:
133133
run: |
134134
cargo doc --lib --no-deps --all-features --document-private-items
135135
env:
136-
RUSTFLAGS: --cfg docsrs
136+
RUSTFLAGS: --cfg docsrs
137137
RUSTDOCFLAGS: --cfg docsrs
138138

139139
minrust:
@@ -246,7 +246,7 @@ jobs:
246246
- name: Install cargo-hack
247247
uses: taiki-e/install-action@cargo-hack
248248
- uses: Swatinem/rust-cache@v2
249-
- name: check --feature-powerset
249+
- name: check --feature-powerset
250250
run: cargo hack check --all --feature-powerset
251251

252252
# TODO - get cross check working

aws-s3-transfer-manager/examples/cp.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@ async fn do_upload(args: Args) -> Result<(), BoxError> {
252252
.bucket(bucket)
253253
.key(key)
254254
.body(stream)
255-
.send()
256-
.await?;
255+
.initiate()?;
257256

258257
let _resp = handle.join().await?;
259258
let elapsed = start.elapsed();

aws-s3-transfer-manager/src/client.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,9 @@ impl Client {
100100
/// .bucket("my-bucket")
101101
/// .key("my-key")
102102
/// .body(stream)
103-
/// .send()
104-
/// .await?;
103+
/// .initiate()?;
105104
///
106-
/// // send() may return before the transfer is complete.
105+
/// // initiate() will return before the transfer is complete.
107106
/// // Call the `join()` method on the returned handle to drive the transfer to completion.
108107
/// // The handle can also be used to get progress, pause, or cancel the transfer, etc.
109108
/// let response = handle.join().await?;

aws-s3-transfer-manager/src/operation/download/builders.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,10 @@ impl DownloadFluentBuilder {
524524

525525
impl crate::operation::download::input::DownloadInputBuilder {
526526
/// Initiate a download transfer for a single object with this input using the given client.
527-
pub fn send_with(self, client: &crate::Client) -> Result<DownloadHandle, crate::error::Error> {
527+
pub fn initiate_with(
528+
self,
529+
client: &crate::Client,
530+
) -> Result<DownloadHandle, crate::error::Error> {
528531
let mut fluent_builder = client.download();
529532
fluent_builder.inner = self;
530533
fluent_builder.initiate()

aws-s3-transfer-manager/src/operation/upload.rs

+115-47
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ mod service;
1414

1515
use crate::error;
1616
use crate::io::InputStream;
17-
use aws_smithy_types::byte_stream::ByteStream;
1817
use context::UploadContext;
1918
pub use handle::UploadHandle;
19+
use handle::{MultipartUploadData, UploadType};
2020
/// Request type for uploads to Amazon S3
2121
pub use input::{UploadInput, UploadInputBuilder};
2222
/// Response type for uploads to Amazon S3
@@ -36,57 +36,58 @@ pub(crate) struct Upload;
3636

3737
impl Upload {
3838
/// Execute a single `Upload` transfer operation
39-
pub(crate) async fn orchestrate(
39+
pub(crate) fn orchestrate(
4040
handle: Arc<crate::client::Handle>,
4141
mut input: crate::operation::upload::UploadInput,
4242
) -> Result<UploadHandle, error::Error> {
43-
let min_mpu_threshold = handle.mpu_threshold_bytes();
44-
4543
let stream = input.take_body();
46-
let ctx = new_context(handle, input);
47-
48-
// MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway)
49-
// While true for file-based workloads, the upper `size_hint` might not be equal to the actual bytes transferred.
50-
let content_length = stream
51-
.size_hint()
52-
.upper()
53-
.ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?;
44+
let ctx = new_context(handle.clone(), input);
45+
Ok(UploadHandle::new(
46+
ctx.clone(),
47+
tokio::spawn(try_start_upload(handle.clone(), stream, ctx)),
48+
))
49+
}
50+
}
5451

52+
async fn try_start_upload(
53+
handle: Arc<crate::client::Handle>,
54+
stream: InputStream,
55+
ctx: UploadContext,
56+
) -> Result<UploadType, crate::error::Error> {
57+
let min_mpu_threshold = handle.mpu_threshold_bytes();
58+
59+
// MPU has max of 10K parts which requires us to know the upper bound on the content length (today anyway)
60+
// While true for file-based workloads, the upper `size_hint` might not be equal to the actual bytes transferred.
61+
let content_length = stream
62+
.size_hint()
63+
.upper()
64+
.ok_or_else(crate::io::error::Error::upper_bound_size_hint_required)?;
65+
66+
let upload_type = if content_length < min_mpu_threshold && !stream.is_mpu_only() {
67+
tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request");
68+
UploadType::PutObject(tokio::spawn(put_object(
69+
ctx.clone(),
70+
stream,
71+
content_length,
72+
)))
73+
} else {
74+
// TODO - to upload a 0 byte object via MPU you have to send [CreateMultipartUpload, UploadPart(part=1, 0 bytes), CompleteMultipartUpload]
75+
// we should add tests for this and hide this edge case from the user (e.g. send an empty part when a custom PartStream returns `None` immediately)
5576
// FIXME - investigate what it would take to allow non mpu uploads for `PartStream` implementations
56-
let handle = if content_length < min_mpu_threshold && !stream.is_mpu_only() {
57-
tracing::trace!("upload request content size hint ({content_length}) less than min part size threshold ({min_mpu_threshold}); sending as single PutObject request");
58-
try_start_put_object(ctx, stream, content_length).await?
59-
} else {
60-
// TODO - to upload a 0 byte object via MPU you have to send [CreateMultipartUpload, UploadPart(part=1, 0 bytes), CompleteMultipartUpload]
61-
// we should add tests for this and hide this edge case from the user (e.g. send an empty part when a custom PartStream returns `None` immediately)
62-
try_start_mpu_upload(ctx, stream, content_length).await?
63-
};
64-
65-
Ok(handle)
66-
}
77+
try_start_mpu_upload(ctx, stream, content_length).await?
78+
};
79+
Ok(upload_type)
6780
}
6881

69-
async fn try_start_put_object(
82+
async fn put_object(
7083
ctx: UploadContext,
7184
stream: InputStream,
7285
content_length: u64,
73-
) -> Result<UploadHandle, crate::error::Error> {
74-
let byte_stream = stream.into_byte_stream().await?;
86+
) -> Result<UploadOutput, error::Error> {
87+
let body = stream.into_byte_stream().await?;
7588
let content_length: i64 = content_length.try_into().map_err(|_| {
7689
error::invalid_input(format!("content_length:{} is invalid.", content_length))
7790
})?;
78-
79-
Ok(UploadHandle::new_put_object(
80-
ctx.clone(),
81-
tokio::spawn(put_object(ctx.clone(), byte_stream, content_length)),
82-
))
83-
}
84-
85-
async fn put_object(
86-
ctx: UploadContext,
87-
body: ByteStream,
88-
content_length: i64,
89-
) -> Result<UploadOutput, error::Error> {
9091
// FIXME - This affects performance in cases with a lot of small files workloads. We need a way to schedule
9192
// more work for a lot of small files.
9293
let _permit = ctx.handle.scheduler.acquire_permit().await?;
@@ -149,7 +150,7 @@ async fn try_start_mpu_upload(
149150
ctx: UploadContext,
150151
stream: InputStream,
151152
content_length: u64,
152-
) -> Result<UploadHandle, crate::error::Error> {
153+
) -> Result<UploadType, crate::error::Error> {
153154
let part_size = cmp::max(
154155
ctx.handle.upload_part_size_bytes(),
155156
content_length / MAX_PARTS,
@@ -161,18 +162,22 @@ async fn try_start_mpu_upload(
161162
"multipart upload started with upload id: {:?}",
162163
mpu.upload_id
163164
);
164-
165-
let mut handle = UploadHandle::new_multipart(ctx);
166-
handle.set_response(mpu);
167-
distribute_work(&mut handle, stream, part_size)?;
168-
Ok(handle)
165+
let upload_id = mpu.upload_id.clone().expect("upload_id is present");
166+
let mut mpu_data = MultipartUploadData {
167+
upload_part_tasks: Default::default(),
168+
read_body_tasks: Default::default(),
169+
response: Some(mpu),
170+
upload_id: upload_id.clone(),
171+
};
172+
173+
distribute_work(&mut mpu_data, ctx, stream, part_size)?;
174+
Ok(UploadType::MultipartUpload(mpu_data))
169175
}
170176

171177
fn new_context(handle: Arc<crate::client::Handle>, req: UploadInput) -> UploadContext {
172178
UploadContext {
173179
handle,
174180
request: Arc::new(req),
175-
upload_id: None,
176181
}
177182
}
178183

@@ -225,6 +230,7 @@ mod test {
225230
use crate::io::InputStream;
226231
use crate::operation::upload::UploadInput;
227232
use crate::types::{ConcurrencySetting, PartSize};
233+
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadOutput;
228234
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadOutput;
229235
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadOutput;
230236
use aws_sdk_s3::operation::put_object::PutObjectOutput;
@@ -233,6 +239,7 @@ mod test {
233239
use bytes::Bytes;
234240
use std::ops::Deref;
235241
use std::sync::Arc;
242+
use std::sync::Barrier;
236243
use test_common::mock_client_with_stubbed_http_client;
237244

238245
#[tokio::test]
@@ -297,7 +304,7 @@ mod test {
297304
.key("test-key")
298305
.body(stream);
299306

300-
let handle = request.send_with(&tm).await.unwrap();
307+
let handle = request.initiate_with(&tm).unwrap();
301308

302309
let resp = handle.join().await.unwrap();
303310
assert_eq!(expected_upload_id.deref(), resp.upload_id.unwrap().deref());
@@ -331,9 +338,70 @@ mod test {
331338
.bucket("test-bucket")
332339
.key("test-key")
333340
.body(stream);
334-
let handle = request.send_with(&tm).await.unwrap();
341+
let handle = request.initiate_with(&tm).unwrap();
335342
let resp = handle.join().await.unwrap();
336343
assert_eq!(resp.upload_id(), None);
337344
assert_eq!(expected_e_tag.deref(), resp.e_tag().unwrap());
338345
}
346+
347+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
348+
async fn test_abort_multipart_upload() {
349+
let expected_upload_id = Arc::new("test-upload".to_owned());
350+
let body = Bytes::from_static(b"every adolescent dog goes bonkers early");
351+
let stream = InputStream::from(body);
352+
let bucket = "test-bucket";
353+
let key = "test-key";
354+
let wait_till_create_mpu = Arc::new(Barrier::new(2));
355+
356+
let upload_id = expected_upload_id.clone();
357+
let create_mpu =
358+
mock!(aws_sdk_s3::Client::create_multipart_upload).then_output(move || {
359+
CreateMultipartUploadOutput::builder()
360+
.upload_id(upload_id.as_ref().to_owned())
361+
.build()
362+
});
363+
364+
let upload_part = mock!(aws_sdk_s3::Client::upload_part).then_output({
365+
let wait_till_create_mpu = wait_till_create_mpu.clone();
366+
move || {
367+
wait_till_create_mpu.wait();
368+
UploadPartOutput::builder().build()
369+
}
370+
});
371+
372+
let abort_mpu = mock!(aws_sdk_s3::Client::abort_multipart_upload)
373+
.match_requests({
374+
let upload_id: Arc<String> = expected_upload_id.clone();
375+
move |input| {
376+
input.upload_id.as_ref() == Some(&upload_id)
377+
&& input.bucket() == Some(bucket)
378+
&& input.key() == Some(key)
379+
}
380+
})
381+
.then_output(|| AbortMultipartUploadOutput::builder().build());
382+
383+
let client = mock_client_with_stubbed_http_client!(
384+
aws_sdk_s3,
385+
RuleMode::Sequential,
386+
&[create_mpu, upload_part, abort_mpu]
387+
);
388+
389+
let tm_config = crate::Config::builder()
390+
.concurrency(ConcurrencySetting::Explicit(1))
391+
.set_multipart_threshold(PartSize::Target(10))
392+
.set_target_part_size(PartSize::Target(5 * 1024 * 1024))
393+
.client(client)
394+
.build();
395+
396+
let tm = crate::Client::new(tm_config);
397+
398+
let request = UploadInput::builder()
399+
.bucket("test-bucket")
400+
.key("test-key")
401+
.body(stream);
402+
let handle = request.initiate_with(&tm).unwrap();
403+
wait_till_create_mpu.wait();
404+
let abort = handle.abort().await.unwrap();
405+
assert_eq!(abort.upload_id().unwrap(), expected_upload_id.deref());
406+
}
339407
}

aws-s3-transfer-manager/src/operation/upload/builders.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ impl UploadFluentBuilder {
2929
bucket = self.inner.bucket.as_deref().unwrap_or_default(),
3030
key = self.inner.key.as_deref().unwrap_or_default(),
3131
))]
32-
// TODO: Make it consistent with download by renaming it to initiate and making it synchronous
33-
pub async fn send(self) -> Result<UploadHandle, crate::error::Error> {
32+
33+
pub fn initiate(self) -> Result<UploadHandle, crate::error::Error> {
3434
let input = self.inner.build()?;
35-
crate::operation::upload::Upload::orchestrate(self.handle, input).await
35+
crate::operation::upload::Upload::orchestrate(self.handle, input)
3636
}
3737

3838
/// <p>The canned ACL to apply to the object. For more information, see <a href="https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#CannedACL">Canned ACL</a> in the <i>Amazon S3 User Guide</i>.</p>
@@ -911,12 +911,12 @@ impl UploadFluentBuilder {
911911

912912
impl crate::operation::upload::input::UploadInputBuilder {
913913
/// Initiate an upload transfer for a single object with this input using the given client.
914-
pub async fn send_with(
914+
pub fn initiate_with(
915915
self,
916916
client: &crate::Client,
917917
) -> Result<UploadHandle, crate::error::Error> {
918918
let mut fluent_builder = client.upload();
919919
fluent_builder.inner = self;
920-
fluent_builder.send().await
920+
fluent_builder.initiate()
921921
}
922922
}

aws-s3-transfer-manager/src/operation/upload/context.rs

-12
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ use std::sync::Arc;
1212
pub(crate) struct UploadContext {
1313
/// reference to client handle used to do actual work
1414
pub(crate) handle: Arc<crate::client::Handle>,
15-
/// the multipart upload ID
16-
pub(crate) upload_id: Option<String>,
1715
/// the original request (NOTE: the body will have been taken for processing, only the other fields remain)
1816
pub(crate) request: Arc<UploadInput>,
1917
}
@@ -28,14 +26,4 @@ impl UploadContext {
2826
pub(crate) fn request(&self) -> &UploadInput {
2927
self.request.deref()
3028
}
31-
32-
/// Set the upload ID if the transfer will be done using a multipart upload
33-
pub(crate) fn set_upload_id(&mut self, upload_id: String) {
34-
self.upload_id = Some(upload_id)
35-
}
36-
37-
/// Check if this transfer is using multipart upload
38-
pub(crate) fn is_multipart_upload(&self) -> bool {
39-
self.upload_id.is_some()
40-
}
4129
}

0 commit comments

Comments
 (0)