Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with various Fallback Handling #7272

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,19 @@ void setIsNamespaceEnabledAccount(String isNamespaceEnabledAccount) {
this.isNamespaceEnabledAccount = isNamespaceEnabledAccount;
}

/**
* Checks if the FixedSASTokenProvider is configured for the current account.
*
* @return true if the FixedSASTokenProvider is configured, false otherwise.
*/
public boolean isFixedSASTokenProviderConfigured() {
try {
return getSASTokenProvider() instanceof FixedSASTokenProvider;
} catch (AzureBlobFileSystemException e) {
return false;
}
}

private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
String value = getPasswordString(key);
if (StringUtils.isBlank(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -122,11 +123,14 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_FIXED_TOKEN;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.UNAUTHORIZED_SAS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -234,6 +238,29 @@ public void initialize(URI uri, Configuration configuration)
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Validates if the correct SAS Token provider is configured for non-HNS accounts.
* For non-HNS accounts, if the authentication type is set to SAS, only a fixed SAS Token is supported as of now.
* A custom SAS Token Provider should not be configured in such cases, as it will override the FixedSASTokenProvider and render it unused.
* If the namespace is not enabled and the FixedSASTokenProvider is not configured,
* an InvalidConfigurationValueException will be thrown.
*
* @throws InvalidConfigurationValueException if account is not namespace enabled and FixedSASTokenProvider is not configured.
*/
try {
if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName()) == AuthType.SAS && // Auth type is SAS
!tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext)) && // Account is FNS
!abfsConfiguration.isFixedSASTokenProviderConfigured()) { // Fixed SAS Token Provider is not configured
throw new InvalidConfigurationValueException(FS_AZURE_SAS_FIXED_TOKEN, UNAUTHORIZED_SAS);
}
} catch (InvalidConfigurationValueException ex) {
LOG.error("File system configured with Invalid SAS Token Provider for FNS Accounts", ex);
throw ex;
} catch (AzureBlobFileSystemException ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need better exception handling here.
We should first catch SASTokenProviderException and throw InvalidConfigurationValueException() with original exception included as it will tell why getSASTokenProvder() failed.

Then we should catch the AzureBlobFileSystemException that can be thrown while determining account type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken

LOG.error("Failed to determine account type for auth type validation", ex);
throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex);
}

/*
* Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
* Fail initialization of filesystem if the configs are provided. CPK is of
Expand Down Expand Up @@ -266,6 +293,7 @@ public void initialize(URI uri, Configuration configuration)
}
}
}
getAbfsStore().updateClientWithNamespaceInfo(new TracingContext(initFSTracingContext));

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -797,7 +825,7 @@ private FileStatus getFileStatus(final Path path,
Path qualifiedPath = makeQualified(path);

try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
return getAbfsStore().getFileStatus(qualifiedPath, tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

/**
* Updates the client with the namespace information.
*
* @param tracingContext the tracing context to be used for the operation
* @throws AzureBlobFileSystemException if an error occurs while updating the client
*/
public void updateClientWithNamespaceInfo(TracingContext tracingContext)
throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
AbfsClient.setIsNamespaceEnabled(isNamespaceEnabled);
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
Expand Down Expand Up @@ -635,14 +647,15 @@ public OutputStream createFile(final Path path,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);
createClient.getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);

String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
Expand All @@ -660,9 +673,9 @@ public OutputStream createFile(final Path path,
}

final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (createClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path));
createClient.getEncryptionContextProvider(), getRelativePath(path));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
Expand All @@ -677,7 +690,7 @@ public OutputStream createFile(final Path path,
);

} else {
op = getClient().createPath(relativePath, true,
op = createClient.createPath(relativePath, true,
overwrite,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
Expand All @@ -689,15 +702,16 @@ public OutputStream createFile(final Path path,
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);

String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand All @@ -720,12 +734,12 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;

AbfsClient createClient = getClientHandler().getIngressClient();
try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = getClient().createPath(relativePath, true, false, permissions,
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext);

} catch (AbfsRestOperationException e) {
Expand All @@ -745,12 +759,11 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
op = getClient().createPath(relativePath, true, true, permissions,
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
Expand Down Expand Up @@ -778,22 +791,24 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param client AbfsClient.
* @param clientHandler AbfsClientHandler.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param eTag eTag of the file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClient client,
AbfsClientHandler clientHandler,
FileSystem.Statistics statistics,
String path,
long position,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
Expand All @@ -814,24 +829,38 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withEncryptionAdapter(contextEncryptionAdapter)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withClientHandler(clientHandler)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
createClient.getFileSystem(),
path,
permission,
umask,
Expand All @@ -841,7 +870,7 @@ public void createDirectory(final Path path, final FsPermission permission,
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = getClient().createPath(getRelativePath(path),
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
Expand Down Expand Up @@ -976,6 +1005,7 @@ public OutputStream openFileForWrite(final Path path,
overwrite);

String relativePath = getRelativePath(path);
AbfsClient writeClient = getClientHandler().getIngressClient();

final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, false, tracingContext, null);
Expand All @@ -1000,8 +1030,9 @@ public OutputStream openFileForWrite(final Path path,
}

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
final String eTag = extractEtagHeader(op.getResult());
final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (writeClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String encryptionContext = op.getResult()
.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
Expand All @@ -1010,7 +1041,7 @@ public OutputStream openFileForWrite(final Path path,
"File doesn't have encryptionContext.");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
writeClient.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
Expand All @@ -1020,10 +1051,11 @@ public OutputStream openFileForWrite(final Path path,
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
offset,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
public static final String BLOCK_BLOB_TYPE = "BlockBlob";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
public static final String APPEND_BLOCK = "appendblock";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

//Abfs Http Client Constants for Blob Endpoint APIs.

Expand Down Expand Up @@ -238,7 +240,7 @@ public static ApiVersion getCurrentVersion() {
public static final String PUT_BLOCK_LIST = "PutBlockList";

/**
* Value that differentiates categories of the http_status.
* Value that differentiates categories of the HTTP status.
* <pre>
* 100 - 199 : Informational responses
* 200 - 299 : Successful responses
Expand All @@ -249,6 +251,28 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

/**
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* XML version declaration for the block list.
*/
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>%n";

/**
* Start tag for the block list XML.
*/
public static final String BLOCK_LIST_START_TAG = "<BlockList>%n";

/**
* End tag for the block list XML.
*/
public static final String BLOCK_LIST_END_TAG = "</BlockList>%n";

/**
* Format string for the latest block in the block list XML.
* The placeholder will be replaced with the block identifier.
*/
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>%n";


/**
* List of configurations that are related to Customer-Provided-Keys.
* <ol>
Expand Down Expand Up @@ -289,6 +313,12 @@ public static ApiVersion getCurrentVersion() {
public static final String APACHE_IMPL = "Apache";
public static final String JDK_FALLBACK = "JDK_fallback";
public static final String KEEP_ALIVE_CACHE_CLOSED = "KeepAliveCache is closed";
public static final String DFS_FLUSH = "DFlush";
public static final String DFS_APPEND = "DAppend";
public static final String BLOB_FLUSH = "BFlush";
public static final String BLOB_APPEND = "BAppend";
public static final String FALLBACK_FLUSH = "FBFlush";
public static final String FALLBACK_APPEND = "FBAppend";

private AbfsHttpConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public final class FileSystemConfigurations {
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;

/**
* Length of the block ID used for appends.
*/
public static final int BLOCK_ID_LENGTH = 60;
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Buffer blocks to disk.
* Capacity is limited to available disk space.
Expand Down
Loading
Loading