diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java index 01a36b24fb2f6..fbb607d0a3600 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/audit/HttpReferrerAuditHeader.java @@ -178,9 +178,24 @@ private HttpReferrerAuditHeader( // build the referrer up. so as to find/report problems early initialHeader = buildHttpReferrer(); } - + /** - * Build the referrer string. + * Copy constructor. + * Creates a deep copy of a HttpReferrerAuditHeader object + */ + public HttpReferrerAuditHeader(HttpReferrerAuditHeader httpReferrerAuditHeader) { + this.contextId = requireNonNull(httpReferrerAuditHeader.contextId); + this.evaluated = new ConcurrentHashMap<>(httpReferrerAuditHeader.evaluated); + this.filter = ImmutableSet.copyOf(httpReferrerAuditHeader.filter); + this.operationName = requireNonNull(httpReferrerAuditHeader.operationName); + this.path1 = httpReferrerAuditHeader.path1; + this.path2 = httpReferrerAuditHeader.path2; + this.spanId = requireNonNull(httpReferrerAuditHeader.spanId); + this.attributes = new ConcurrentHashMap<>(httpReferrerAuditHeader.attributes); + this.initialHeader = httpReferrerAuditHeader.initialHeader; + } + + /* Build the referrer string. * This includes dynamically evaluating all of the evaluated * attributes. * If there is an error creating the string it will be logged once diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index e85cae3942b84..e874aceea4957 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -52,6 +52,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.hadoop.fs.s3a.audit.impl.ActiveAuditManagerS3A; +import org.apache.hadoop.fs.s3a.audit.impl.LoggingAuditor; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -227,6 +229,8 @@ import org.apache.hadoop.util.SemaphoredDelegatingExecutor; import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.functional.CallableRaisingIOE; +import software.amazon.s3.analyticsaccelerator.request.StreamContext; +import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import static java.util.Objects.requireNonNull; import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL; @@ -1953,11 +1957,15 @@ private FSDataInputStream executeOpen( LOG.debug("Opening '{}'", readContext); if (this.analyticsAcceleratorEnabled) { + ActiveAuditManagerS3A.WrappingAuditSpan wrappingAuditSpan = (ActiveAuditManagerS3A.WrappingAuditSpan) auditSpan; + LoggingAuditor.LoggingAuditSpan loggingAuditSpan = (LoggingAuditor.LoggingAuditSpan) wrappingAuditSpan.getSpan(); + StreamContext streamContext = new S3AStreamContext(loggingAuditSpan.getReferrer()); return new FSDataInputStream( new S3ASeekableStream( this.bucket, pathToKey(path), - s3SeekableInputStreamFactory)); + s3SeekableInputStreamFactory, + streamContext)); } if (this.prefetchEnabled) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java index ef6a299081587..f97ced1727ddf 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java @@ -31,6 +31,7 @@ import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.request.StreamContext; import software.amazon.s3.analyticsaccelerator.util.S3URI; public class S3ASeekableStream extends FSInputStream implements StreamCapabilities { @@ -42,9 +43,8 @@ public class S3ASeekableStream extends FSInputStream implements StreamCapabiliti public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class); - public S3ASeekableStream(String bucket, String key, - S3SeekableInputStreamFactory s3SeekableInputStreamFactory) { - this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key)); + public S3ASeekableStream(String bucket, String key, S3SeekableInputStreamFactory s3SeekableInputStreamFactory, StreamContext streamContext) { + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key), streamContext); this.key = key; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStreamContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStreamContext.java new file mode 100644 index 0000000000000..ab6e8e345aece --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStreamContext.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.fs.audit.AuditConstants; +import org.apache.hadoop.fs.store.audit.HttpReferrerAuditHeader; +import software.amazon.s3.analyticsaccelerator.request.GetRequest; +import software.amazon.s3.analyticsaccelerator.request.StreamContext; + +public class S3AStreamContext implements StreamContext { + + private final HttpReferrerAuditHeader referrer; + + public S3AStreamContext(HttpReferrerAuditHeader referrer) { + this.referrer = referrer; + } + + @Override + public String modifyAndBuildReferrerHeader(GetRequest getRequestContext) { + HttpReferrerAuditHeader copyReferrer = new HttpReferrerAuditHeader(this.referrer); + copyReferrer.set(AuditConstants.PARAM_RANGE, getRequestContext.getRange().toHttpString()); + return copyReferrer.buildHttpReferrer(); + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java index e8e989efaa141..59ef46e20d47c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/ActiveAuditManagerS3A.java @@ -701,7 +701,7 @@ public SdkResponse modifyResponse(Context.ModifyResponse context, * Package-private for testing. */ @VisibleForTesting - final class WrappingAuditSpan extends AbstractAuditSpanImpl { + public final class WrappingAuditSpan extends AbstractAuditSpanImpl { /** * Inner span. @@ -797,8 +797,8 @@ public boolean isValidSpan() { * Get the inner span. * @return the span. */ - @VisibleForTesting - AuditSpanS3A getSpan() { + + public AuditSpanS3A getSpan() { return span; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java index 16bae4b816457..111fa7d42b625 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java @@ -271,7 +271,7 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) { * it is private. This is not true, as it is subclassed in * the same file. */ - private class LoggingAuditSpan extends AbstractAuditSpanImpl { + public class LoggingAuditSpan extends AbstractAuditSpanImpl { private final HttpReferrerAuditHeader referrer; @@ -298,7 +298,7 @@ private void attachRangeFromRequest(SdkHttpRequest request, private final String description; - private LoggingAuditSpan( + public LoggingAuditSpan( final String spanId, final String operationName, final CommonAuditContext context, @@ -456,7 +456,7 @@ public String toString() { * Get the referrer. * @return the referrer. */ - private HttpReferrerAuditHeader getReferrer() { + public HttpReferrerAuditHeader getReferrer() { return referrer; }