Skip to content

Commit

Permalink
Refactor integration for s3aseekablestream
Browse files Browse the repository at this point in the history
Dummy
  • Loading branch information
rajdchak committed Jan 17, 2025
1 parent 22d0267 commit 98bc8f4
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
Expand Down Expand Up @@ -85,11 +84,6 @@
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;

import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -522,11 +516,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean s3AccessGrantsEnabled;

/**
* Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true.
*/
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Expand Down Expand Up @@ -812,27 +801,6 @@ public void initialize(URI name, Configuration originalConf)
// thread pool init requires store to be created
initThreadPools();


if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
if(this.analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3 CRT client for analytics accelerator S3");
this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3 async client for analytics accelerator S3");
this.s3AsyncClient = store.getOrCreateAsyncClient();
}

ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient),
seekableInputStreamConfiguration);
}

// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
Expand Down Expand Up @@ -1934,14 +1902,6 @@ private FSDataInputStream executeOpen(
true,
inputStreamStats);

if (this.analyticsAcceleratorEnabled) {
return new FSDataInputStream(
new S3ASeekableStream(
this.bucket,
pathToKey(path),
s3SeekableInputStreamFactory));
}

// do not validate() the parameters as the store
// completes this.
ObjectReadParameters parameters = new ObjectReadParameters()
Expand Down Expand Up @@ -4349,7 +4309,6 @@ protected synchronized void stopAllServices() {
store = null;
s3Client = null;
s3AsyncClient = null;
s3SeekableInputStreamFactory = null;

// At this point the S3A client is shut down,
// now the executor pools are closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,29 @@

import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.fs.FSInputStream;

import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.util.S3URI;

public class S3ASeekableStream extends FSInputStream implements StreamCapabilities {
public class S3ASeekableStream extends ObjectInputStream implements StreamCapabilities {

private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private final String key;
private volatile boolean closed;

public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class);

public S3ASeekableStream(String bucket, String key,
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
this.key = key;
public S3ASeekableStream(final ObjectReadParameters parameters, S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
super(parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()));
}

/**
Expand Down Expand Up @@ -139,6 +140,24 @@ public int available() throws IOException {
return super.available();
}

@Override
protected boolean isStreamOpen() {
return !isClosed();
}

protected boolean isClosed() {
return inputStream == null;
}

@Override
protected void abortInFinalizer() {
try {
close();
} catch (IOException ignored) {

}
}

@Override
public synchronized void close() throws IOException {
if(!closed) {
Expand All @@ -148,7 +167,7 @@ public synchronized void close() throws IOException {
inputStream = null;
super.close();
} catch (IOException ioe) {
LOG.debug("Failure closing stream {}: ", key);
LOG.debug("Failure closing stream {}: ", getKey());
throw ioe;
}
}
Expand All @@ -165,19 +184,19 @@ private void onReadFailure(IOException ioe) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
getKey(), ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"not trying to recover:",
key, ioe);
getKey(), ioe);
}
this.close();
}


protected void throwIfClosed() throws IOException {
if (closed) {
throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
Expand Down Expand Up @@ -88,8 +89,8 @@
import org.apache.hadoop.util.functional.Tuples;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.S3AUtils.extractException;
import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength;
import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
Expand All @@ -110,6 +111,7 @@
import static org.apache.hadoop.fs.s3a.Statistic.STORE_IO_THROTTLE_RATE;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.createS3SeekableInputStreamFactory;
import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.createStreamFactory;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
Expand Down Expand Up @@ -230,7 +232,23 @@ public class S3AStoreImpl
@Override
protected void serviceInit(final Configuration conf) throws Exception {

objectInputStreamFactory = createStreamFactory(conf);
if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) {
boolean analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
S3AsyncClient s3AsyncClient;
LOG.info("Using S3SeekableInputStream");
if(analyticsAcceleratorCRTEnabled) {
LOG.info("Using S3 CRT client for analytics accelerator S3");
s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build();
} else {
LOG.info("Using S3 async client for analytics accelerator S3");
s3AsyncClient = getOrCreateAsyncClient();
}
objectInputStreamFactory = createS3SeekableInputStreamFactory(s3AsyncClient);

} else {
objectInputStreamFactory = createStreamFactory(conf);
}
addService(objectInputStreamFactory);

// init all child services
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public enum InputStreamType {
*/
Prefetch("prefetch", c ->
new PrefetchingInputStreamFactory()),

/**
* The analytics input stream.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.apache.hadoop.fs.s3a.impl.streams;

Check failure on line 1 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/S3SeekableInputStreamFactory.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/S3SeekableInputStreamFactory.java#L1

asflicense: Missing Apache License

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3ASeekableStream;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;

import java.io.IOException;

import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.util.Preconditions.checkState;

public class S3SeekableInputStreamFactory extends AbstractObjectInputStreamFactory {

S3AsyncClient s3AsyncClient;
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory s3SeekableInputStreamFactory;

public S3SeekableInputStreamFactory(S3AsyncClient s3AsyncClient) {
super("S3SeekableInputStreamFactory");
this.s3AsyncClient = s3AsyncClient;
}

@Override
protected void serviceInit(final Configuration conf) throws Exception {
super.serviceInit(conf);
ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
this.s3SeekableInputStreamFactory =
new software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory(
new S3SdkObjectClient(this.s3AsyncClient),
seekableInputStreamConfiguration);
}

@Override
public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
return new S3ASeekableStream(
parameters,
s3SeekableInputStreamFactory);
}

/**
* Get the number of background threads required for this factory.
* @return the count of background threads.
*/
@Override
public StreamThreadOptions threadRequirements() {
throw new UnsupportedOperationException("This method is not supported");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import software.amazon.awssdk.services.s3.S3AsyncClient;

import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
Expand Down Expand Up @@ -59,4 +60,15 @@ public static ObjectInputStreamFactory createStreamFactory(final Configuration c
.factory()
.apply(conf);
}

/**
* Create the input stream factory the configuration asks for.
* This does not initialize the factory.
* @param s3AsyncClient s3 async client
* @return a stream factory.
*/
public static ObjectInputStreamFactory createS3SeekableInputStreamFactory(final S3AsyncClient s3AsyncClient) {
return new S3SeekableInputStreamFactory(s3AsyncClient);
}

}

0 comments on commit 98bc8f4

Please sign in to comment.