Skip to content

HADOOP-19444: ABFS: [FnsOverBlob][Tests] Add Tests For Negative Scenarios Identified for Ingress Operations #7424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 11, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE;

/**
* Configuration for Azure Blob FileSystem.
Expand Down Expand Up @@ -560,6 +561,10 @@ public void validateConfiguredServiceType(boolean isHNSEnabled)
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
} else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB
&& getIngressServiceType() == AbfsServiceType.DFS) {
throw new InvalidConfigurationValueException(
FS_AZURE_INGRESS_SERVICE_TYPE, INCORRECT_INGRESS_TYPE);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2011,18 +2011,24 @@ private boolean isEmptyListResults(AbfsHttpOperation result) {
}

/**
* Generates an XML string representing the block list.
* Generate the XML block list using a comma-separated string of block IDs.
*
* @param blockIds the set of block IDs
* @return the generated XML string
* @param blockIdString The comma-separated block IDs.
* @return the XML representation of the block list.
*/
public static String generateBlockListXml(List<String> blockIds) {
public static String generateBlockListXml(String blockIdString) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(String.format(XML_VERSION));
stringBuilder.append(String.format(BLOCK_LIST_START_TAG));
for (String blockId : blockIds) {
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));

// Split the block ID string by commas and generate XML for each block ID
if (!blockIdString.isEmpty()) {
String[] blockIds = blockIdString.split(",");
for (String blockId : blockIds) {
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));
}
}

stringBuilder.append(String.format(BLOCK_LIST_END_TAG));
return stringBuilder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,6 @@ public final class AbfsErrors {
"Error while recovering from create failure.";
public static final String ERR_RENAME_RECOVERY =
"Error while recovering from rename failure.";
public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem.";
private AbfsErrors() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,7 @@ public synchronized void close() throws IOException {
bufferIndex = 0;
closed = true;
writeOperations.clear();
if (getBlockManager().hasActiveBlock()) {
getBlockManager().clearActiveBlock();
}
getBlockManager().close();
}
LOG.debug("Closing AbfsOutputStream : {}", this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;

/**
* Manages Azure Blob blocks for append operations.
*/
Expand All @@ -38,9 +40,8 @@ public class AzureBlobBlockManager extends AzureBlockManager {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsOutputStream.class);


/** The list of already committed blocks is stored in this list. */
private List<String> committedBlockEntries = new ArrayList<>();
/** Cached list of committed block IDs */
private final StringBuilder committedBlockEntries = new StringBuilder();

/** The list to store blockId, position, and status. */
private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
Expand All @@ -60,7 +61,10 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
throws AzureBlobFileSystemException {
super(abfsOutputStream, blockFactory, bufferSize);
if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) {
this.committedBlockEntries = getBlockList(abfsOutputStream.getTracingContext());
List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext());
if (!committedBlocks.isEmpty()) {
committedBlockEntries.append(String.join(COMMA, committedBlocks));
}
}
LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}",
abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
Expand Down Expand Up @@ -146,11 +150,12 @@ protected synchronized void updateEntry(AbfsBlock block) {
* @return whether we have some data to commit or not.
* @throws IOException if an I/O error occurs
*/
protected synchronized boolean hasListToCommit() throws IOException {
protected synchronized boolean hasBlocksToCommit() throws IOException {
// Adds all the committed blocks if available to the list of blocks to be added in putBlockList.
if (blockEntryList.isEmpty()) {
return false; // No entries to commit
}

while (!blockEntryList.isEmpty()) {
BlockEntry current = blockEntryList.poll();
if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
Expand All @@ -177,7 +182,11 @@ protected synchronized boolean hasListToCommit() throws IOException {
throw new IOException(errorMessage);
}
}
committedBlockEntries.add(current.getBlockId());
// Append the current block's ID to the committedBlockBuilder
if (committedBlockEntries.length() > 0) {
committedBlockEntries.append(COMMA);
}
committedBlockEntries.append(current.getBlockId());
LOG.debug("Block {} added to committed entries.", current.getBlockId());
}
return true;
Expand All @@ -188,7 +197,13 @@ protected synchronized boolean hasListToCommit() throws IOException {
*
* @return the block ID list
*/
protected List<String> getBlockIdList() {
return committedBlockEntries;
protected String getBlockIdToCommit() {
return committedBlockEntries.toString();
}

@Override
public void close(){
super.close();
committedBlockEntries.setLength(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
if (getAbfsOutputStream().isAppendBlob()) {
return null;
}
if (!blobBlockManager.hasListToCommit()) {
if (!blobBlockManager.hasBlocksToCommit()) {
return null;
}
try {
// Generate the xml with the list of blockId's to generate putBlockList call.
String blockListXml = generateBlockListXml(
blobBlockManager.getBlockIdList());
blobBlockManager.getBlockIdToCommit());
TracingContext tracingContextFlush = new TracingContext(tracingContext);
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
Expand Down Expand Up @@ -297,7 +297,8 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
activeBlock, reqParams,
new TracingContext(getAbfsOutputStream().getTracingContext()));
} catch (InvalidIngressServiceException ex) {
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath());
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.",
getAbfsOutputStream().getPath());
getAbfsOutputStream().switchHandler();
op = getAbfsOutputStream().getIngressHandler()
.remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,12 @@ void clearActiveBlock() {
activeBlock = null;
}
}

// Used to clear any resources used by the block manager.
void close() {
if (hasActiveBlock()) {
clearActiveBlock();
}
LOG.debug("AzureBlockManager closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,10 @@ protected synchronized AbfsBlock getActiveBlock() {
protected synchronized boolean hasActiveBlock() {
return super.hasActiveBlock();
}

@Override
public void close() {
super.close();
LOG.debug("AzureDFSBlockManager closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
final String leaseId,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
if (!blobBlockManager.hasListToCommit()) {
if (!blobBlockManager.hasBlocksToCommit()) {
return null;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -221,8 +218,7 @@ void createRenamePendingJson(Path path, byte[] bytes)
abfsClient.append(path.toUri().getPath(), bytes,
appendRequestParameters, null, null, tracingContext);

List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId));
String blockList = generateBlockListXml(blockIdList);
String blockList = generateBlockListXml(blockId);
// PutBlockList on the path.
abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8),
path.toUri().getPath(), true, null, null, eTag, null, tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.util.Lists;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
Expand Down Expand Up @@ -320,7 +321,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
return ingressClient.flush(path, 3, false, false, null,
null, encryptionAdapter, getTestTracingContext(fs, false));
} else {
byte[] buffer = generateBlockListXml(new ArrayList<>()).getBytes(StandardCharsets.UTF_8);
byte[] buffer = generateBlockListXml(EMPTY_STRING).getBytes(StandardCharsets.UTF_8);
return ingressClient.flush(buffer, path, false, null,
null, null, encryptionAdapter, getTestTracingContext(fs, false));
}
Expand Down
Loading