Skip to content

Commit

Permalink
Skip hedge for s3 express (#89)
Browse files Browse the repository at this point in the history
*Issue #, if available:*

#87

*Description of changes:*

allow hedge builder to take a policy and customize the policy for upload to skip retry on s3 express buckets
  • Loading branch information
TingDaoK authored Jan 16, 2025
1 parent dca33b4 commit ec06719
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 30 deletions.
35 changes: 10 additions & 25 deletions aws-s3-transfer-manager/src/middleware/hedge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,30 @@ const MIN_DATA_POINTS: u64 = 20;
const PERIOD: Duration = Duration::from_secs(2);

/*
* During uploads, S3 recommends retrying the slowest 5% of requests for latency-sensitive applications,
* as some requests can experience high time to first byte. If a slow part is hit near the end of the request,
* the application may spend the last few seconds waiting for those final parts to complete, which can reduce overall
* throughput. This layer is used to retry the slowest 5% of requests to improve performance.
* Based on our experiments, this makes a significant difference for multipart upload use-cases and
* does not have a noticeable impact for the Download.
*/
* During uploads, S3 recommends retrying the slowest 5% of requests for latency-sensitive applications,
* as some requests can experience high time to first byte. If a slow part is hit near the end of the request,
* the application may spend the last few seconds waiting for those final parts to complete, which can reduce overall
* throughput. This layer is used to retry the slowest 5% of requests to improve performance.
* Based on our experiments, this makes a significant difference for multipart upload use-cases and
* does not have a noticeable impact for the Download.
*/
pub(crate) struct Builder<P> {
policy: P,
latency_percentile: f32,
min_data_points: u64,
period: Duration,
}

#[derive(Debug, Clone, Default)]
pub(crate) struct DefaultPolicy;

impl<T: Clone> Policy<T> for DefaultPolicy {
fn clone_request(&self, req: &T) -> Option<T> {
Some(req.clone())
}

fn can_retry(&self, _req: &T) -> bool {
true
}
}

impl Default for Builder<DefaultPolicy> {
fn default() -> Self {
impl<P> Builder<P> {
pub(crate) fn new(policy: P) -> Self {
Self {
policy: DefaultPolicy,
policy,
latency_percentile: LATENCY_PERCENTILE,
min_data_points: MIN_DATA_POINTS,
period: PERIOD,
}
}
}

impl<P> Builder<P> {
/// Converts the `Hedge` into a `Layer` that can be used in a service stack.
pub(crate) fn into_layer<Request, S>(self) -> impl Layer<S, Service = Hedge<S, P>> + Clone
where
Expand Down
65 changes: 60 additions & 5 deletions aws-s3-transfer-manager/src/operation/upload/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::sync::Arc;

use super::MultipartUploadData;
use crate::{
error,
io::{
Expand All @@ -12,11 +13,9 @@ use crate::{
use aws_sdk_s3::{primitives::ByteStream, types::CompletedPart};
use bytes::Buf;
use tokio::{sync::Mutex, task};
use tower::{service_fn, Service, ServiceBuilder, ServiceExt};
use tower::{hedge::Policy, service_fn, Service, ServiceBuilder, ServiceExt};
use tracing::Instrument;

use super::MultipartUploadData;

/// Request/input type for our "upload_part" service.
#[derive(Debug, Clone)]
pub(super) struct UploadPartRequest {
Expand All @@ -25,6 +24,22 @@ pub(super) struct UploadPartRequest {
pub(super) upload_id: String,
}

#[derive(Debug, Clone)]
pub(crate) struct UploadHedgePolicy;

impl Policy<UploadPartRequest> for UploadHedgePolicy {
fn clone_request(&self, req: &UploadPartRequest) -> Option<UploadPartRequest> {
if req.ctx.request.bucket().unwrap_or("").ends_with("--x-s3") {
None
} else {
Some(req.clone())
}
}
fn can_retry(&self, _req: &UploadPartRequest) -> bool {
true
}
}

/// handler (service fn) for a single part
async fn upload_part_handler(request: UploadPartRequest) -> Result<CompletedPart, error::Error> {
let ctx = request.ctx;
Expand Down Expand Up @@ -80,7 +95,7 @@ pub(super) fn upload_part_service(
.buffer(ctx.handle.num_workers())
// FIXME - Hedged request should also get a permit. Currently, it can bypass the
// concurrency_limit layer.
.layer(hedge::Builder::default().into_layer())
.layer(hedge::Builder::new(UploadHedgePolicy).into_layer())
.service(svc);
svc.map_err(|err| {
let e = err
Expand Down Expand Up @@ -128,7 +143,6 @@ pub(super) fn distribute_work(
parent: parent_span_for_all_tasks,
"upload-net-tasks"
);

let svc = upload_part_service(&ctx);
let n_workers = ctx.handle.num_workers();
for _ in 0..n_workers {
Expand Down Expand Up @@ -180,3 +194,44 @@ pub(super) async fn read_body(
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use crate::client::Handle;
use crate::operation::upload::UploadInput;
use crate::runtime::scheduler::Scheduler;
use crate::Config;
use test_common::mock_client_with_stubbed_http_client;

fn _mock_upload_part_request_with_bucket_name(bucket_name: &str) -> UploadPartRequest {
let s3_client = mock_client_with_stubbed_http_client!(aws_sdk_s3, []);
UploadPartRequest {
ctx: UploadContext {
handle: Arc::new(Handle {
config: Config::builder().client(s3_client).build(),
scheduler: Scheduler::new(0),
}),
request: Arc::new(UploadInput::builder().bucket(bucket_name).build().unwrap()),
},
part_data: PartData {
part_number: 0,
data: Default::default(),
},
upload_id: "test-id".to_string(),
}
}

#[test]
fn test_upload_hedge_policy_operation() {
let policy = UploadHedgePolicy;

// Test S3 Express bucket
let express_req = _mock_upload_part_request_with_bucket_name("test--x-s3");
assert!(policy.clone_request(&express_req).is_none());

// Test regular bucket
let regular_req = _mock_upload_part_request_with_bucket_name("test");
assert!(policy.clone_request(&regular_req).is_some());
}
}

0 comments on commit ec06719

Please sign in to comment.