diff --git a/aws-s3-transfer-manager/src/middleware/hedge.rs b/aws-s3-transfer-manager/src/middleware/hedge.rs index ab145de6..f2e1cf0d 100644 --- a/aws-s3-transfer-manager/src/middleware/hedge.rs +++ b/aws-s3-transfer-manager/src/middleware/hedge.rs @@ -20,13 +20,13 @@ const MIN_DATA_POINTS: u64 = 20; const PERIOD: Duration = Duration::from_secs(2); /* -* During uploads, S3 recommends retrying the slowest 5% of requests for latency-sensitive applications, -* as some requests can experience high time to first byte. If a slow part is hit near the end of the request, -* the application may spend the last few seconds waiting for those final parts to complete, which can reduce overall -* throughput. This layer is used to retry the slowest 5% of requests to improve performance. -* Based on our experiments, this makes a significant difference for multipart upload use-cases and -* does not have a noticeable impact for the Download. -*/ + * During uploads, S3 recommends retrying the slowest 5% of requests for latency-sensitive applications, + * as some requests can experience high time to first byte. If a slow part is hit near the end of the request, + * the application may spend the last few seconds waiting for those final parts to complete, which can reduce overall + * throughput. This layer is used to retry the slowest 5% of requests to improve performance. + * Based on our experiments, this makes a significant difference for multipart upload use-cases and + * does not have a noticeable impact for the Download. + */ pub(crate) struct Builder

{ policy: P, latency_percentile: f32, @@ -34,31 +34,16 @@ pub(crate) struct Builder

{ period: Duration, } -#[derive(Debug, Clone, Default)] -pub(crate) struct DefaultPolicy; - -impl Policy for DefaultPolicy { - fn clone_request(&self, req: &T) -> Option { - Some(req.clone()) - } - - fn can_retry(&self, _req: &T) -> bool { - true - } -} - -impl Default for Builder { - fn default() -> Self { +impl

Builder

{ + pub(crate) fn new(policy: P) -> Self { Self { - policy: DefaultPolicy, + policy, latency_percentile: LATENCY_PERCENTILE, min_data_points: MIN_DATA_POINTS, period: PERIOD, } } -} -impl

Builder

{ /// Converts the `Hedge` into a `Layer` that can be used in a service stack. pub(crate) fn into_layer(self) -> impl Layer> + Clone where diff --git a/aws-s3-transfer-manager/src/operation/upload/service.rs b/aws-s3-transfer-manager/src/operation/upload/service.rs index dcbd79c2..640eff06 100644 --- a/aws-s3-transfer-manager/src/operation/upload/service.rs +++ b/aws-s3-transfer-manager/src/operation/upload/service.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use super::MultipartUploadData; use crate::{ error, io::{ @@ -12,11 +13,9 @@ use crate::{ use aws_sdk_s3::{primitives::ByteStream, types::CompletedPart}; use bytes::Buf; use tokio::{sync::Mutex, task}; -use tower::{service_fn, Service, ServiceBuilder, ServiceExt}; +use tower::{hedge::Policy, service_fn, Service, ServiceBuilder, ServiceExt}; use tracing::Instrument; -use super::MultipartUploadData; - /// Request/input type for our "upload_part" service. #[derive(Debug, Clone)] pub(super) struct UploadPartRequest { @@ -25,6 +24,22 @@ pub(super) struct UploadPartRequest { pub(super) upload_id: String, } +#[derive(Debug, Clone)] +pub(crate) struct UploadHedgePolicy; + +impl Policy for UploadHedgePolicy { + fn clone_request(&self, req: &UploadPartRequest) -> Option { + if req.ctx.request.bucket().unwrap_or("").ends_with("--x-s3") { + None + } else { + Some(req.clone()) + } + } + fn can_retry(&self, _req: &UploadPartRequest) -> bool { + true + } +} + /// handler (service fn) for a single part async fn upload_part_handler(request: UploadPartRequest) -> Result { let ctx = request.ctx; @@ -80,7 +95,7 @@ pub(super) fn upload_part_service( .buffer(ctx.handle.num_workers()) // FIXME - Hedged request should also get a permit. Currently, it can bypass the // concurrency_limit layer. - .layer(hedge::Builder::default().into_layer()) + .layer(hedge::Builder::new(UploadHedgePolicy).into_layer()) .service(svc); svc.map_err(|err| { let e = err @@ -128,7 +143,6 @@ pub(super) fn distribute_work( parent: parent_span_for_all_tasks, "upload-net-tasks" ); - let svc = upload_part_service(&ctx); let n_workers = ctx.handle.num_workers(); for _ in 0..n_workers { @@ -180,3 +194,44 @@ pub(super) async fn read_body( } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::client::Handle; + use crate::operation::upload::UploadInput; + use crate::runtime::scheduler::Scheduler; + use crate::Config; + use test_common::mock_client_with_stubbed_http_client; + + fn _mock_upload_part_request_with_bucket_name(bucket_name: &str) -> UploadPartRequest { + let s3_client = mock_client_with_stubbed_http_client!(aws_sdk_s3, []); + UploadPartRequest { + ctx: UploadContext { + handle: Arc::new(Handle { + config: Config::builder().client(s3_client).build(), + scheduler: Scheduler::new(0), + }), + request: Arc::new(UploadInput::builder().bucket(bucket_name).build().unwrap()), + }, + part_data: PartData { + part_number: 0, + data: Default::default(), + }, + upload_id: "test-id".to_string(), + } + } + + #[test] + fn test_upload_hedge_policy_operation() { + let policy = UploadHedgePolicy; + + // Test S3 Express bucket + let express_req = _mock_upload_part_request_with_bucket_name("test--x-s3"); + assert!(policy.clone_request(&express_req).is_none()); + + // Test regular bucket + let regular_req = _mock_upload_part_request_with_bucket_name("test"); + assert!(policy.clone_request(®ular_req).is_some()); + } +}