Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19232: [ABFS][FNSOverBlob] Implementing Ingress Support with various Fallback Handling #7272

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,11 @@ public boolean isSmallWriteOptimizationEnabled() {
return this.enableSmallWriteOptimization;
}

public void setSmallWriteOptimization(final boolean enableSmallWriteOptimization) {
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
this.enableSmallWriteOptimization = enableSmallWriteOptimization;
}


public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public void initialize(URI uri, Configuration configuration)
}
}
}
getAbfsStore().updateClientWithNamespaceInfo(new TracingContext(initFSTracingContext));

LOG.trace("Initiate check for delegation token manager");
if (UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -797,7 +798,7 @@ private FileStatus getFileStatus(final Path path,
Path qualifiedPath = makeQualified(path);

try {
return abfsStore.getFileStatus(qualifiedPath, tracingContext);
return getAbfsStore().getFileStatus(qualifiedPath, tracingContext);
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,18 @@ public AzureBlobFileSystemStore(
"abfs-bounded");
}

/**
* Updates the client with the namespace information.
*
* @param tracingContext the tracing context to be used for the operation
* @throws AzureBlobFileSystemException if an error occurs while updating the client
*/
public void updateClientWithNamespaceInfo(TracingContext tracingContext)
throws AzureBlobFileSystemException {
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
getClient().setIsNamespaceEnabled(isNamespaceEnabled);
}

/**
* Checks if the given key in Azure Storage should be stored as a page
* blob instead of block blob.
Expand Down Expand Up @@ -635,14 +647,15 @@ public OutputStream createFile(final Path path,
final FsPermission permission, final FsPermission umask,
TracingContext tracingContext) throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createFile", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createFile filesystem: {} path: {} overwrite: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);
createClient.getFileSystem(),
path,
overwrite,
permission,
umask,
isNamespaceEnabled);

String relativePath = getRelativePath(path);
boolean isAppendBlob = false;
Expand All @@ -660,9 +673,9 @@ public OutputStream createFile(final Path path,
}

final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (createClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path));
createClient.getEncryptionContextProvider(), getRelativePath(path));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
}
Expand All @@ -677,7 +690,7 @@ public OutputStream createFile(final Path path,
);

} else {
op = getClient().createPath(relativePath, true,
op = createClient.createPath(relativePath, true,
overwrite,
new Permissions(isNamespaceEnabled, permission, umask),
isAppendBlob,
Expand All @@ -689,15 +702,16 @@ public OutputStream createFile(final Path path,
perfInfo.registerResult(op.getResult()).registerSuccess(true);

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);

String eTag = extractEtagHeader(op.getResult());
return new AbfsOutputStream(
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
0,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand All @@ -720,12 +734,12 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
final ContextEncryptionAdapter contextEncryptionAdapter,
final TracingContext tracingContext) throws IOException {
AbfsRestOperation op;

AbfsClient createClient = getClientHandler().getIngressClient();
try {
// Trigger a create with overwrite=false first so that eTag fetch can be
// avoided for cases when no pre-existing file is present (major portion
// of create file traffic falls into the case of no pre-existing file).
op = getClient().createPath(relativePath, true, false, permissions,
op = createClient.createPath(relativePath, true, false, permissions,
isAppendBlob, null, contextEncryptionAdapter, tracingContext);

} catch (AbfsRestOperationException e) {
Expand All @@ -745,12 +759,11 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
}
}

String eTag = op.getResult()
.getResponseHeader(HttpHeaderConfigurations.ETAG);
String eTag = extractEtagHeader(op.getResult());

try {
// overwrite only if eTag matches with the file properties fetched befpre
op = getClient().createPath(relativePath, true, true, permissions,
op = createClient.createPath(relativePath, true, true, permissions,
isAppendBlob, eTag, contextEncryptionAdapter, tracingContext);
} catch (AbfsRestOperationException ex) {
if (ex.getStatusCode() == HttpURLConnection.HTTP_PRECON_FAILED) {
Expand Down Expand Up @@ -778,22 +791,24 @@ private AbfsRestOperation conditionalCreateOverwriteFile(final String relativePa
*
* @param isAppendBlob is Append blob support enabled?
* @param lease instance of AbfsLease for this AbfsOutputStream.
* @param client AbfsClient.
* @param clientHandler AbfsClientHandler.
* @param statistics FileSystem statistics.
* @param path Path for AbfsOutputStream.
* @param position Position or offset of the file being opened, set to 0
* when creating a new file, but needs to be set for APPEND
* calls on the same file.
* @param eTag eTag of the file.
* @param tracingContext instance of TracingContext for this AbfsOutputStream.
* @return AbfsOutputStreamContext instance with the desired parameters.
*/
private AbfsOutputStreamContext populateAbfsOutputStreamContext(
boolean isAppendBlob,
AbfsLease lease,
AbfsClient client,
AbfsClientHandler clientHandler,
FileSystem.Statistics statistics,
String path,
long position,
String eTag,
ContextEncryptionAdapter contextEncryptionAdapter,
TracingContext tracingContext) {
int bufferSize = abfsConfiguration.getWriteBufferSize();
Expand All @@ -814,24 +829,38 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withEncryptionAdapter(contextEncryptionAdapter)
.withBlockFactory(getBlockFactory())
.withBlockOutputActiveBlocks(blockOutputActiveBlocks)
.withClient(client)
.withClientHandler(clientHandler)
.withPosition(position)
.withFsStatistics(statistics)
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
.withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled())
.withETag(eTag)
.build();
}

/**
* Creates a directory.
*
* @param path Path of the directory to create.
* @param permission Permission of the directory.
* @param umask Umask of the directory.
* @param tracingContext tracing context
*
* @throws AzureBlobFileSystemException server error.
*/
public void createDirectory(final Path path, final FsPermission permission,
final FsPermission umask, TracingContext tracingContext)
throws IOException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
AbfsClient createClient = getClientHandler().getIngressClient();
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
LOG.debug("createDirectory filesystem: {} path: {} permission: {} umask: {} isNamespaceEnabled: {}",
getClient().getFileSystem(),
createClient.getFileSystem(),
path,
permission,
umask,
Expand All @@ -841,7 +870,7 @@ public void createDirectory(final Path path, final FsPermission permission,
!isNamespaceEnabled || abfsConfiguration.isEnabledMkdirOverwrite();
Permissions permissions = new Permissions(isNamespaceEnabled,
permission, umask);
final AbfsRestOperation op = getClient().createPath(getRelativePath(path),
final AbfsRestOperation op = createClient.createPath(getRelativePath(path),
false, overwrite, permissions, false, null, null, tracingContext);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
}
Expand Down Expand Up @@ -976,6 +1005,7 @@ public OutputStream openFileForWrite(final Path path,
overwrite);

String relativePath = getRelativePath(path);
AbfsClient writeClient = getClientHandler().getIngressClient();

final AbfsRestOperation op = getClient()
.getPathStatus(relativePath, false, tracingContext, null);
Expand All @@ -1000,8 +1030,9 @@ public OutputStream openFileForWrite(final Path path,
}

AbfsLease lease = maybeCreateLease(relativePath, tracingContext);
final String eTag = extractEtagHeader(op.getResult());
final ContextEncryptionAdapter contextEncryptionAdapter;
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
if (writeClient.getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String encryptionContext = op.getResult()
.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
Expand All @@ -1010,7 +1041,7 @@ public OutputStream openFileForWrite(final Path path,
"File doesn't have encryptionContext.");
}
contextEncryptionAdapter = new ContextProviderEncryptionAdapter(
getClient().getEncryptionContextProvider(), getRelativePath(path),
writeClient.getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
} else {
contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
Expand All @@ -1020,10 +1051,11 @@ public OutputStream openFileForWrite(final Path path,
populateAbfsOutputStreamContext(
isAppendBlob,
lease,
getClient(),
getClientHandler(),
statistics,
relativePath,
offset,
eTag,
contextEncryptionAdapter,
tracingContext));
}
Expand Down Expand Up @@ -1085,8 +1117,7 @@ public boolean rename(final Path source,
boolean isNamespaceEnabled = getIsNamespaceEnabled(tracingContext);
final AbfsClientRenameResult abfsClientRenameResult =
getClient().renamePath(sourceRelativePath, destinationRelativePath,
continuation, tracingContext, sourceEtag, false,
isNamespaceEnabled);
continuation, tracingContext, sourceEtag, false);

AbfsRestOperation op = abfsClientRenameResult.getOp();
perfInfo.registerResult(op.getResult());
Expand Down Expand Up @@ -1123,7 +1154,7 @@ public void delete(final Path path, final boolean recursive,
do {
try (AbfsPerfInfo perfInfo = startTracking("delete", "deletePath")) {
AbfsRestOperation op = getClient().deletePath(relativePath, recursive,
continuation, tracingContext, getIsNamespaceEnabled(tracingContext));
continuation, tracingContext);
perfInfo.registerResult(op.getResult());
continuation = op.getResult().getResponseHeader(HttpHeaderConfigurations.X_MS_CONTINUATION);
perfInfo.registerSuccess(true);
Expand Down Expand Up @@ -1835,6 +1866,15 @@ private AbfsServiceType getAbfsServiceTypeFromUrl() {
return AbfsServiceType.DFS;
}

/**
* Retrieves the configured service type for the Azure Blob File System.
*
* @return the configured {@link AbfsServiceType} for the file system.
*/
public AbfsServiceType getConfiguredServiceType() {
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
return abfsConfiguration.getFsConfiguredServiceType();
}

/**
* Populate a new AbfsClientContext instance with the desired properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
public static final String DEFAULT_TIMEOUT = "90";
public static final String APPEND_BLOB_TYPE = "appendblob";
public static final String LIST = "list";
public static final String BLOCK_BLOB_TYPE = "BlockBlob";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
public static final String APPEND_BLOCK = "appendblock";
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

//Abfs Http Client Constants for Blob Endpoint APIs.

Expand Down Expand Up @@ -249,6 +251,27 @@ public static ApiVersion getCurrentVersion() {
*/
public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;

/**
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
* XML version declaration for the block list.
*/
public static final String XML_VERSION = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n";

/**
* Start tag for the block list XML.
*/
public static final String BLOCK_LIST_START_TAG = "<BlockList>\n";

/**
* End tag for the block list XML.
*/
public static final String BLOCK_LIST_END_TAG = "</BlockList>\n";

/**
* Format string for the latest block in the block list XML.
* The placeholder will be replaced with the block identifier.
*/
public static final String LATEST_BLOCK_FORMAT = "<Latest>%s</Latest>\n";

/**
* List of configurations that are related to Customer-Provided-Keys.
* <ol>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,11 @@ public final class FileSystemConfigurations {
*/
public static final int BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT = 20;

/**
* Length of the block ID used for appends.
*/
public static final int BLOCK_ID_LENGTH = 60;
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Buffer blocks to disk.
* Capacity is limited to available disk space.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,5 +104,11 @@ public final class HttpHeaderConfigurations {
*/
public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5";

/**
* Http Request Header for denoting blob type.
* {@value}
*/
public static final String X_MS_BLOB_TYPE = "x-ms-blob-type";

private HttpHeaderConfigurations() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.contracts.exceptions;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class InvalidIngressServiceException
anujmodi2021 marked this conversation as resolved.
Show resolved Hide resolved
extends AbfsRestOperationException {
public InvalidIngressServiceException(final int statusCode,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also try to include erver request Id for the failed request here??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is already included in the exception message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be I am missing something here, but I can see there are 2 constructors for base exception and the one that excepts AbfsHttpOperation as a separate parameter calls the formatMessage() which adds the req Id to exception message.

Refering to:

private static String formatMessage(final AbfsHttpOperation abfsHttpOperation) {

Copy link
Contributor Author

@anmolanmol1234 anmolanmol1234 Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner exception that gets caught has the request Id in the message already

final String errorCode,
final String errorMessage,
final Exception innerException) {
super(statusCode, errorCode, errorMessage, innerException);
}
}
Loading
Loading