Skip to content

Commit

Permalink
Forked s3 files, before modification.
Browse files Browse the repository at this point in the history
  • Loading branch information
kshakir committed Jan 29, 2019
1 parent fd5bdf8 commit f346c5e
Show file tree
Hide file tree
Showing 22 changed files with 2,976 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.lerch.s3fs;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;

public class AmazonS3ClientFactory extends AmazonS3Factory {

@Override
protected S3Client createS3Client(S3ClientBuilder builder) {
return builder.build();
}
}
99 changes: 99 additions & 0 deletions filesystems/s3/src/main/java/org/lerch/s3fs/AmazonS3Factory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.lerch.s3fs;

import software.amazon.awssdk.core.auth.AwsCredentialsProvider;
import software.amazon.awssdk.core.auth.StaticCredentialsProvider;
import software.amazon.awssdk.core.auth.AwsCredentials;
import software.amazon.awssdk.core.auth.DefaultCredentialsProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3AdvancedConfiguration;
import software.amazon.awssdk.core.client.builder.ClientHttpConfiguration;
import software.amazon.awssdk.core.config.ClientOverrideConfiguration;

import java.net.URI;
import java.util.Properties;


/**
* Factory base class to create a new AmazonS3 instance.
*/
public abstract class AmazonS3Factory {

public static final String ACCESS_KEY = "s3fs_access_key";
public static final String SECRET_KEY = "s3fs_secret_key";
public static final String REQUEST_METRIC_COLLECTOR_CLASS = "s3fs_request_metric_collector_class";
public static final String CONNECTION_TIMEOUT = "s3fs_connection_timeout";
public static final String MAX_CONNECTIONS = "s3fs_max_connections";
public static final String MAX_ERROR_RETRY = "s3fs_max_retry_error";
public static final String PROTOCOL = "s3fs_protocol";
public static final String PROXY_DOMAIN = "s3fs_proxy_domain";
public static final String PROXY_HOST = "s3fs_proxy_host";
public static final String PROXY_PASSWORD = "s3fs_proxy_password";
public static final String PROXY_PORT = "s3fs_proxy_port";
public static final String PROXY_USERNAME = "s3fs_proxy_username";
public static final String PROXY_WORKSTATION = "s3fs_proxy_workstation";
public static final String SOCKET_SEND_BUFFER_SIZE_HINT = "s3fs_socket_send_buffer_size_hint";
public static final String SOCKET_RECEIVE_BUFFER_SIZE_HINT = "s3fs_socket_receive_buffer_size_hint";
public static final String SOCKET_TIMEOUT = "s3fs_socket_timeout";
public static final String USER_AGENT = "s3fs_user_agent";
public static final String SIGNER_OVERRIDE = "s3fs_signer_override";
public static final String PATH_STYLE_ACCESS = "s3fs_path_style_access";

/**
* Build a new Amazon S3 instance with the URI and the properties provided
* @param uri URI mandatory
* @param props Properties with the credentials and others options
* @return S3Client
*/
public S3Client getS3Client(URI uri, Properties props) {
S3ClientBuilder builder = S3Client.builder();
if (uri != null && uri.getHost() != null)
builder.endpointOverride(uri);

builder.credentialsProvider(getCredentialsProvider(props))
.httpConfiguration(getHttpConfiguration(props))
.advancedConfiguration(getAdvancedConfiguration(props))
.overrideConfiguration(getOverrideConfiguration(props));
//.region(getRegion(props));

return createS3Client(builder);
}

/**
* should return a new S3Client
*
* @param credentialsProvider AWSCredentialsProvider mandatory
* @param clientConfiguration ClientConfiguration mandatory
* @param requestMetricsCollector RequestMetricCollector mandatory
* @return {@link software.amazon.awssdk.services.s3.AmazonS3}
*/
protected abstract S3Client createS3Client(S3ClientBuilder builder);

protected AwsCredentialsProvider getCredentialsProvider(Properties props) {
AwsCredentialsProvider credentialsProvider;
if (props.getProperty(ACCESS_KEY) == null && props.getProperty(SECRET_KEY) == null)
credentialsProvider = DefaultCredentialsProvider.create();
else
credentialsProvider = StaticCredentialsProvider.create(getAWSCredentials(props));
return credentialsProvider;
}

protected AwsCredentials getAWSCredentials(Properties props) {
return AwsCredentials.create(props.getProperty(ACCESS_KEY), props.getProperty(SECRET_KEY));
}

protected ClientHttpConfiguration getHttpConfiguration(Properties props) {
// TODO: custom http configuration based on properties
return ClientHttpConfiguration.builder().build();
}

protected S3AdvancedConfiguration getAdvancedConfiguration(Properties props) {
// TODO: custom configuration based on properties
return S3AdvancedConfiguration.builder().build();
}

protected ClientOverrideConfiguration getOverrideConfiguration(Properties props) {
// TODO: custom configuration based on properties
return ClientOverrideConfiguration.builder().build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.lerch.s3fs;

import static java.lang.String.format;

import java.nio.file.AccessDeniedException;
import java.nio.file.AccessMode;
import java.util.EnumSet;

import software.amazon.awssdk.services.s3.model.Grant;
import software.amazon.awssdk.services.s3.model.Owner;
import software.amazon.awssdk.services.s3.model.Permission;

public class S3AccessControlList {
private String fileStoreName;
private String key;
private Iterable<Grant> grants;
private Owner owner;

public S3AccessControlList(String fileStoreName, String key, Iterable<Grant> grants, Owner owner) {
this.fileStoreName = fileStoreName;
this.grants = grants;
this.key = key;
this.owner = owner;
}

public String getKey() {
return key;
}

/**
* have almost one of the permission set in the parameter permissions
*
* @param permissions almost one
* @return
*/
private boolean hasPermission(EnumSet<Permission> permissions) {
for (Grant grant : grants)
if (grant.grantee().id().equals(owner.id()) && permissions.contains(grant.permission()))
return true;
return false;
}

public void checkAccess(AccessMode[] modes) throws AccessDeniedException {
for (AccessMode accessMode : modes) {
switch (accessMode) {
case EXECUTE:
throw new AccessDeniedException(fileName(), null, "file is not executable");
case READ:
if (!hasPermission(EnumSet.of(Permission.FULL_CONTROL, Permission.READ)))
throw new AccessDeniedException(fileName(), null, "file is not readable");
break;
case WRITE:
if (!hasPermission(EnumSet.of(Permission.FULL_CONTROL, Permission.WRITE)))
throw new AccessDeniedException(fileName(), null, format("bucket '%s' is not writable", fileStoreName));
break;
}
}
}

private String fileName() {
return fileStoreName + S3Path.PATH_SEPARATOR + key;
}
}
177 changes: 177 additions & 0 deletions filesystems/s3/src/main/java/org/lerch/s3fs/S3FileChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package org.lerch.s3fs;

import org.apache.tika.Tika;

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseInputStream;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;

import static java.lang.String.format;

public class S3FileChannel extends FileChannel {

private S3Path path;
private Set<? extends OpenOption> options;
private FileChannel filechannel;
private Path tempFile;

public S3FileChannel(S3Path path, Set<? extends OpenOption> options) throws IOException {
this.path = path;
this.options = Collections.unmodifiableSet(new HashSet<>(options));
String key = path.getKey();
boolean exists = path.getFileSystem().provider().exists(path);

if (exists && this.options.contains(StandardOpenOption.CREATE_NEW))
throw new FileAlreadyExistsException(format("target already exists: %s", path));
else if (!exists && !this.options.contains(StandardOpenOption.CREATE_NEW) &&
!this.options.contains(StandardOpenOption.CREATE))
throw new NoSuchFileException(format("target not exists: %s", path));

tempFile = Files.createTempFile("temp-s3-", key.replaceAll("/", "_"));
boolean removeTempFile = true;
try {
if (exists) {
try (ResponseInputStream<GetObjectResponse> byteStream = path.getFileSystem()
.getClient()
.getObject(GetObjectRequest
.builder()
.bucket(path.getFileStore().name())
.key(key).build())) {
Files.copy(byteStream, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
}

Set<? extends OpenOption> fileChannelOptions = new HashSet<>(this.options);
fileChannelOptions.remove(StandardOpenOption.CREATE_NEW);
filechannel = FileChannel.open(tempFile, fileChannelOptions);
removeTempFile = false;
} finally {
if (removeTempFile) {
Files.deleteIfExists(tempFile);
}
}
}

@Override
public int read(ByteBuffer dst) throws IOException {
return filechannel.read(dst);
}

@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
return filechannel.read(dsts, offset, length);
}

@Override
public int write(ByteBuffer src) throws IOException {
return filechannel.write(src);
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
return filechannel.write(srcs, offset, length);
}

@Override
public long position() throws IOException {
return filechannel.position();
}

@Override
public FileChannel position(long newPosition) throws IOException {
return filechannel.position(newPosition);
}

@Override
public long size() throws IOException {
return filechannel.size();
}

@Override
public FileChannel truncate(long size) throws IOException {
return filechannel.truncate(size);
}

@Override
public void force(boolean metaData) throws IOException {
filechannel.force(metaData);
}

@Override
public long transferTo(long position, long count, WritableByteChannel target) throws IOException {
return filechannel.transferTo(position, count, target);
}

@Override
public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException {
return filechannel.transferFrom(src, position, count);
}

@Override
public int read(ByteBuffer dst, long position) throws IOException {
return filechannel.read(dst, position);
}

@Override
public int write(ByteBuffer src, long position) throws IOException {
return filechannel.write(src, position);
}

@Override
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException {
return filechannel.map(mode, position, size);
}

@Override
public FileLock lock(long position, long size, boolean shared) throws IOException {
return filechannel.lock(position, size, shared);
}

@Override
public FileLock tryLock(long position, long size, boolean shared) throws IOException {
return filechannel.tryLock(position, size, shared);
}

@Override
protected void implCloseChannel() throws IOException {
super.close();
filechannel.close();
if (!this.options.contains(StandardOpenOption.READ)) {
sync();
}
Files.deleteIfExists(tempFile);
}

/**
* try to sync the temp file with the remote s3 path.
*
* @throws IOException if the tempFile fails to open a newInputStream
*/
protected void sync() throws IOException {
try (InputStream stream = new BufferedInputStream(Files.newInputStream(tempFile))) {
PutObjectRequest.Builder builder = PutObjectRequest.builder();
long length = Files.size(tempFile);
builder.bucket(path.getFileStore().name())
.key(path.getKey())
.contentLength(length)
.contentType(new Tika().detect(stream, path.getFileName().toString()));

path.getFileSystem().getClient().putObject(builder.build(), RequestBody.of(stream, length));
}
}
}
Loading

0 comments on commit f346c5e

Please sign in to comment.