Skip to content

Commit 21f1789

Browse files
committed
HADOOP-19295. S3A: large uploads can timeout over slow links (#7089) (#7100)
This sets a different timeout for data upload PUT/POST calls to all other requests, so that slow block uploads do not trigger timeouts as rapidly as normal requests. This was always the behavior in the V1 AWS SDK; for V2 we have to explicitly set it on the operations we want to give extended timeouts. Option: fs.s3a.connection.part.upload.timeout Default: 15m Contributed by Steve Loughran
1 parent 8b39378 commit 21f1789

13 files changed

+365
-23
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

+15
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,21 @@ private Constants() {
398398
public static final Duration DEFAULT_CONNECTION_ACQUISITION_TIMEOUT_DURATION =
399399
Duration.ofSeconds(60);
400400

401+
/**
402+
* Timeout for uploading all of a small object or a single part
403+
* of a larger one.
404+
* {@value}.
405+
* Default unit is milliseconds for consistency with other options.
406+
*/
407+
public static final String PART_UPLOAD_TIMEOUT =
408+
"fs.s3a.connection.part.upload.timeout";
409+
410+
/**
411+
* Default part upload timeout: 15 minutes.
412+
*/
413+
public static final Duration DEFAULT_PART_UPLOAD_TIMEOUT =
414+
Duration.ofMinutes(15);
415+
401416
/**
402417
* Should TCP Keepalive be enabled on the socket?
403418
* This adds some network IO, but finds failures faster.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

+8
Original file line numberDiff line numberDiff line change
@@ -1280,6 +1280,13 @@ protected RequestFactory createRequestFactory() {
12801280
STORAGE_CLASS);
12811281
}
12821282

1283+
// optional custom timeout for bulk uploads
1284+
Duration partUploadTimeout = ConfigurationHelper.getDuration(getConf(),
1285+
PART_UPLOAD_TIMEOUT,
1286+
DEFAULT_PART_UPLOAD_TIMEOUT,
1287+
TimeUnit.MILLISECONDS,
1288+
Duration.ZERO);
1289+
12831290
return RequestFactoryImpl.builder()
12841291
.withBucket(requireNonNull(bucket))
12851292
.withCannedACL(getCannedACL())
@@ -1289,6 +1296,7 @@ protected RequestFactory createRequestFactory() {
12891296
.withContentEncoding(contentEncoding)
12901297
.withStorageClass(storageClass)
12911298
.withMultipartUploadEnabled(isMultipartUploadEnabled)
1299+
.withPartUploadTimeout(partUploadTimeout)
12921300
.build();
12931301
}
12941302

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSClientConfig.java

+22
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
29+
import software.amazon.awssdk.awscore.AwsRequest;
30+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2931
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
3032
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
3133
import software.amazon.awssdk.core.retry.RetryMode;
@@ -623,4 +625,24 @@ static ConnectionSettings createConnectionSettings(Configuration conf) {
623625
socketTimeout);
624626
}
625627

628+
/**
629+
* Set a custom ApiCallTimeout for a single request.
630+
* This allows for a longer timeout to be used in data upload
631+
* requests than that for all other S3 interactions;
632+
* This does not happen by default in the V2 SDK
633+
* (see HADOOP-19295).
634+
* <p>
635+
* If the timeout is zero, the request is not patched.
636+
* @param builder builder to patch.
637+
* @param timeout timeout
638+
*/
639+
public static void setRequestTimeout(AwsRequest.Builder builder, Duration timeout) {
640+
if (!timeout.isZero()) {
641+
builder.overrideConfiguration(
642+
AwsRequestOverrideConfiguration.builder()
643+
.apiCallTimeout(timeout)
644+
.apiCallAttemptTimeout(timeout)
645+
.build());
646+
}
647+
}
626648
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

+37
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.s3a.impl;
2020

21+
import java.time.Duration;
2122
import java.util.Base64;
2223
import java.util.HashMap;
2324
import java.util.List;
@@ -59,7 +60,9 @@
5960
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
6061

6162
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
63+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
6264
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
65+
import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
6366
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
6467
import static org.apache.hadoop.util.Preconditions.checkArgument;
6568
import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -128,6 +131,12 @@ public class RequestFactoryImpl implements RequestFactory {
128131
*/
129132
private final boolean isMultipartUploadEnabled;
130133

134+
/**
135+
* Timeout for uploading objects/parts.
136+
* This will be set on data put/post operations only.
137+
*/
138+
private final Duration partUploadTimeout;
139+
131140
/**
132141
* Constructor.
133142
* @param builder builder with all the configuration.
@@ -142,6 +151,7 @@ protected RequestFactoryImpl(
142151
this.contentEncoding = builder.contentEncoding;
143152
this.storageClass = builder.storageClass;
144153
this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
154+
this.partUploadTimeout = builder.partUploadTimeout;
145155
}
146156

147157
/**
@@ -338,6 +348,11 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
338348
putObjectRequestBuilder.storageClass(storageClass);
339349
}
340350

351+
// Set the timeout for object uploads but not directory markers.
352+
if (!isDirectoryMarker) {
353+
setRequestTimeout(putObjectRequestBuilder, partUploadTimeout);
354+
}
355+
341356
return prepareRequest(putObjectRequestBuilder);
342357
}
343358

@@ -581,6 +596,9 @@ public UploadPartRequest.Builder newUploadPartRequestBuilder(
581596
.partNumber(partNumber)
582597
.contentLength(size);
583598
uploadPartEncryptionParameters(builder);
599+
600+
// Set the request timeout for the part upload
601+
setRequestTimeout(builder, partUploadTimeout);
584602
return prepareRequest(builder);
585603
}
586604

@@ -688,6 +706,13 @@ public static final class RequestFactoryBuilder {
688706
*/
689707
private boolean isMultipartUploadEnabled = true;
690708

709+
/**
710+
* Timeout for uploading objects/parts.
711+
* This will be set on data put/post operations only.
712+
* A zero value means "no custom timeout"
713+
*/
714+
private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
715+
691716
private RequestFactoryBuilder() {
692717
}
693718

@@ -785,6 +810,18 @@ public RequestFactoryBuilder withMultipartUploadEnabled(
785810
this.isMultipartUploadEnabled = value;
786811
return this;
787812
}
813+
814+
/**
815+
* Timeout for uploading objects/parts.
816+
* This will be set on data put/post operations only.
817+
* A zero value means "no custom timeout"
818+
* @param value new value
819+
* @return the builder
820+
*/
821+
public RequestFactoryBuilder withPartUploadTimeout(final Duration value) {
822+
partUploadTimeout = value;
823+
return this;
824+
}
788825
}
789826

790827
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java

+22-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.InputStream;
2727
import java.io.UncheckedIOException;
2828
import java.nio.ByteBuffer;
29+
import java.time.LocalDateTime;
2930
import java.util.function.Supplier;
3031
import javax.annotation.Nullable;
3132

@@ -224,6 +225,12 @@ public static abstract class BaseContentProvider<T extends InputStream>
224225
*/
225226
private T currentStream;
226227

228+
/**
229+
* When did this upload start?
230+
* Use in error messages.
231+
*/
232+
private final LocalDateTime startTime;
233+
227234
/**
228235
* Constructor.
229236
* @param size size of the data. Must be non-negative.
@@ -241,6 +248,7 @@ protected BaseContentProvider(int size, @Nullable Supplier<Boolean> isOpen) {
241248
checkArgument(size >= 0, "size is negative: %s", size);
242249
this.size = size;
243250
this.isOpen = isOpen;
251+
this.startTime = LocalDateTime.now();
244252
}
245253

246254
/**
@@ -274,8 +282,11 @@ public final InputStream newStream() {
274282
close();
275283
checkOpen();
276284
streamCreationCount++;
277-
if (streamCreationCount > 1) {
278-
LOG.info("Stream created more than once: {}", this);
285+
if (streamCreationCount == 2) {
286+
// the stream has been recreated for the first time.
287+
// notify only once for this stream, so as not to flood
288+
// the logs.
289+
LOG.info("Stream recreated: {}", this);
279290
}
280291
return setCurrentStream(createNewStream());
281292
}
@@ -302,6 +313,14 @@ public int getSize() {
302313
return size;
303314
}
304315

316+
/**
317+
* When did this upload start?
318+
* @return start time
319+
*/
320+
public LocalDateTime getStartTime() {
321+
return startTime;
322+
}
323+
305324
/**
306325
* Current stream.
307326
* When {@link #newStream()} is called, this is set to the new value,
@@ -330,6 +349,7 @@ protected T setCurrentStream(T stream) {
330349
public String toString() {
331350
return "BaseContentProvider{" +
332351
"size=" + size +
352+
", initiated at " + startTime +
333353
", streamCreationCount=" + streamCreationCount +
334354
", currentStream=" + currentStream +
335355
'}';

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMiscOperations.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertLacksPathCapabilities;
4242
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4343
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
44+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
4445
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
4546
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
4647
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
@@ -100,7 +101,10 @@ public void testCreateNonRecursiveSuccess() throws IOException {
100101
public void testPutObjectDirect() throws Throwable {
101102
final S3AFileSystem fs = getFileSystem();
102103
try (AuditSpan span = span()) {
103-
RequestFactory factory = RequestFactoryImpl.builder().withBucket(fs.getBucket()).build();
104+
RequestFactory factory = RequestFactoryImpl.builder()
105+
.withBucket(fs.getBucket())
106+
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
107+
.build();
104108
Path path = path("putDirect");
105109
PutObjectRequest.Builder putObjectRequestBuilder =
106110
factory.newPutObjectRequestBuilder(path.toUri().getPath(), null, -1, false);

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.hadoop.util.Progressable;
5555

5656

57+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
5758
import static org.apache.hadoop.fs.s3a.audit.AuditTestSupport.noopAuditor;
5859
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
5960
import static org.apache.hadoop.util.Preconditions.checkNotNull;
@@ -99,6 +100,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
99100
.withRequestPreparer(MockS3AFileSystem::prepareRequest)
100101
.withBucket(BUCKET)
101102
.withEncryptionSecrets(new EncryptionSecrets())
103+
.withPartUploadTimeout(DEFAULT_PART_UPLOAD_TIMEOUT)
102104
.build();
103105

104106
/**

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestUploadRecovery.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,15 @@ public Configuration createConfiguration() {
153153
*/
154154
@Override
155155
public void setup() throws Exception {
156-
SdkFaultInjector.resetEvaluator();
156+
SdkFaultInjector.resetFaultInjector();
157157
super.setup();
158158
}
159159

160160
@Override
161161
public void teardown() throws Exception {
162162
// safety check in case the evaluation is failing any
163163
// request needed in cleanup.
164-
SdkFaultInjector.resetEvaluator();
164+
SdkFaultInjector.resetFaultInjector();
165165

166166
super.teardown();
167167
}

0 commit comments

Comments
 (0)