Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-19444: ABFS: [FnsOverBlob][Tests] Add Tests For Negative Scenarios Identified for Ingress Operations #7424

Merged
merged 12 commits into from
Mar 11, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ public void validateConfiguredServiceType(boolean isHNSEnabled)
} else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) {
throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY,
"Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account");
} else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB
&& getIngressServiceType() == AbfsServiceType.DFS) {
throw new InvalidConfigurationValueException(
FS_AZURE_INGRESS_SERVICE_TYPE, "Ingress Type Cannot be DFS for Blob endpoint configured filesystem");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2010,18 +2010,24 @@ private boolean isEmptyListResults(AbfsHttpOperation result) {
}

/**
* Generates an XML string representing the block list.
* Generate the XML block list using a comma-separated string of block IDs.
*
* @param blockIds the set of block IDs
* @return the generated XML string
* @param blockIdString The comma-separated block IDs.
* @return the XML representation of the block list.
*/
public static String generateBlockListXml(List<String> blockIds) {
public static String generateBlockListXml(String blockIdString) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(String.format(XML_VERSION));
stringBuilder.append(String.format(BLOCK_LIST_START_TAG));
for (String blockId : blockIds) {
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));

// Split the block ID string by commas and generate XML for each block ID
if (!blockIdString.isEmpty()) {
String[] blockIds = blockIdString.split(",");
for (String blockId : blockIds) {
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId));
}
}

stringBuilder.append(String.format(BLOCK_LIST_END_TAG));
return stringBuilder.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,7 @@ public synchronized void close() throws IOException {
bufferIndex = 0;
closed = true;
writeOperations.clear();
if (getBlockManager().hasActiveBlock()) {
getBlockManager().clearActiveBlock();
}
getBlockManager().close();
}
LOG.debug("Closing AbfsOutputStream : {}", this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;

/**
* Manages Azure Blob blocks for append operations.
*/
Expand All @@ -38,9 +40,8 @@ public class AzureBlobBlockManager extends AzureBlockManager {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsOutputStream.class);


/** The list of already committed blocks is stored in this list. */
private List<String> committedBlockEntries = new ArrayList<>();
/** Cached list of committed block IDs */
private final StringBuilder committedBlockEntries = new StringBuilder();

/** The list to store blockId, position, and status. */
private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
Expand All @@ -60,7 +61,10 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
throws AzureBlobFileSystemException {
super(abfsOutputStream, blockFactory, bufferSize);
if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) {
this.committedBlockEntries = getBlockList(abfsOutputStream.getTracingContext());
List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext());
if (!committedBlocks.isEmpty()) {
committedBlockEntries.append(String.join(COMMA, committedBlocks));
}
}
LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}",
abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
Expand Down Expand Up @@ -146,11 +150,12 @@ protected synchronized void updateEntry(AbfsBlock block) {
* @return whether we have some data to commit or not.
* @throws IOException if an I/O error occurs
*/
protected synchronized boolean hasListToCommit() throws IOException {
protected synchronized boolean hasBlocksToCommit() throws IOException {
// Adds all the committed blocks if available to the list of blocks to be added in putBlockList.
if (blockEntryList.isEmpty()) {
return false; // No entries to commit
}

while (!blockEntryList.isEmpty()) {
BlockEntry current = blockEntryList.poll();
if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
Expand All @@ -177,7 +182,11 @@ protected synchronized boolean hasListToCommit() throws IOException {
throw new IOException(errorMessage);
}
}
committedBlockEntries.add(current.getBlockId());
// Append the current block's ID to the committedBlockBuilder
if (committedBlockEntries.length() > 0) {
committedBlockEntries.append(COMMA);
}
committedBlockEntries.append(current.getBlockId());
LOG.debug("Block {} added to committed entries.", current.getBlockId());
}
return true;
Expand All @@ -188,7 +197,13 @@ protected synchronized boolean hasListToCommit() throws IOException {
*
* @return the block ID list
*/
protected List<String> getBlockIdList() {
return committedBlockEntries;
protected String getBlockIdToCommit() {
return committedBlockEntries.toString();
}

@Override
public void close(){
super.close();
committedBlockEntries.setLength(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
if (getAbfsOutputStream().isAppendBlob()) {
return null;
}
if (!blobBlockManager.hasListToCommit()) {
if (!blobBlockManager.hasBlocksToCommit()) {
return null;
}
try {
// Generate the xml with the list of blockId's to generate putBlockList call.
String blockListXml = generateBlockListXml(
blobBlockManager.getBlockIdList());
blobBlockManager.getBlockIdToCommit());
TracingContext tracingContextFlush = new TracingContext(tracingContext);
tracingContextFlush.setIngressHandler(BLOB_FLUSH);
tracingContextFlush.setPosition(String.valueOf(offset));
Expand Down Expand Up @@ -297,7 +297,8 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException {
activeBlock, reqParams,
new TracingContext(getAbfsOutputStream().getTracingContext()));
} catch (InvalidIngressServiceException ex) {
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath());
LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.",
getAbfsOutputStream().getPath());
getAbfsOutputStream().switchHandler();
op = getAbfsOutputStream().getIngressHandler()
.remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,12 @@ void clearActiveBlock() {
activeBlock = null;
}
}

// Used to clear any resources used by the block manager.
void close() {
if (hasActiveBlock()) {
clearActiveBlock();
}
LOG.debug("AzureBlockManager closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,10 @@ protected synchronized AbfsBlock getActiveBlock() {
protected synchronized boolean hasActiveBlock() {
return super.hasActiveBlock();
}

@Override
public void close() {
super.close();
LOG.debug("AzureDFSBlockManager closed.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset,
final String leaseId,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
if (!blobBlockManager.hasListToCommit()) {
if (!blobBlockManager.hasBlocksToCommit()) {
return null;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -221,8 +218,7 @@ void createRenamePendingJson(Path path, byte[] bytes)
abfsClient.append(path.toUri().getPath(), bytes,
appendRequestParameters, null, null, tracingContext);

List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId));
String blockList = generateBlockListXml(blockIdList);
String blockList = generateBlockListXml(blockId);
// PutBlockList on the path.
abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8),
path.toUri().getPath(), true, null, null, eTag, null, tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.hadoop.util.Lists;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
Expand Down Expand Up @@ -320,7 +321,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs,
return ingressClient.flush(path, 3, false, false, null,
null, encryptionAdapter, getTestTracingContext(fs, false));
} else {
byte[] buffer = generateBlockListXml(new ArrayList<>()).getBytes(StandardCharsets.UTF_8);
byte[] buffer = generateBlockListXml(EMPTY_STRING).getBytes(StandardCharsets.UTF_8);
return ingressClient.flush(buffer, path, false, null,
null, null, encryptionAdapter, getTestTracingContext(fs, false));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class ITestAzureBlobFileSystemAppend extends
AbstractAbfsIntegrationTest {

private static final String TEST_FILE_PATH = "testfile";
private static final String TEST_FILE_PATH1 = "testfile1";

private static final String TEST_FOLDER_PATH = "testFolder";

Expand Down Expand Up @@ -243,6 +244,125 @@ public void testCreateOverDfsAppendOverBlob() throws IOException {
.isInstanceOf(AbfsDfsClient.class);
}

/**
* This test verifies that if multiple appends qualify for switch, no appends should fail.
*/
@Test
public void testMultipleAppendsQualifyForSwitch() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();

// Create three output streams with different content length
final byte[] b1 = new byte[8 * ONE_MB];
new Random().nextBytes(b1);

FSDataOutputStream out1 = fs.append(testPath);
FSDataOutputStream out2 = fs.append(testPath);
FSDataOutputStream out3 = fs.append(testPath);

// Submit tasks to write to each output stream
futures.add(executorService.submit(() -> {
try {
out1.write(TEN);
out1.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));

futures.add(executorService.submit(() -> {
try {
out2.write(TWENTY);
out2.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));

futures.add(executorService.submit(() -> {
try {
out3.write(THIRTY);
out3.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
AzureIngressHandler ingressHandlerFallback
= ((AbfsOutputStream) out1.getWrappedStream()).getIngressHandler();
AbfsClient clientFallback = ingressHandlerFallback.getClient();
Assertions.assertThat(clientFallback)
.as("DFS client was not used after fallback")
.isInstanceOf(AbfsDfsClient.class);
}

/**
* This test verifies that parallel writes on dfs and blob endpoint should not fail.
*/
@Test
public void testParallelWritesOnDfsAndBlob() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
Path testPath1 = path(TEST_FILE_PATH1);
AzureBlobFileSystemStore.Permissions permissions
= new AzureBlobFileSystemStore.Permissions(false,
FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
fs.getAbfsStore()
.getAbfsConfiguration()
.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name());
FSDataOutputStream out1 = fs.create(testPath);
fs.getAbfsStore().getClientHandler().getDfsClient().
createPath(makeQualified(testPath1).toUri().getPath(), true, false,
permissions, false, null,
null, getTestTracingContext(fs, true));
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Future<?>> futures = new ArrayList<>();

// Create three output streams with different content length
final byte[] b1 = new byte[8 * ONE_MB];
new Random().nextBytes(b1);
FSDataOutputStream out2 = fs.append(testPath1);

// Submit tasks to write to each output stream
futures.add(executorService.submit(() -> {
try {
out1.write(TEN);
out1.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));

futures.add(executorService.submit(() -> {
try {
out2.write(TWENTY);
out2.hsync();
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
checkFuturesForExceptions(futures, 0);
}


/**
* Creates a file over Blob and attempts to append over DFS.
* It should fallback to Blob when appending to the file fails.
Expand All @@ -252,6 +372,7 @@ public void testCreateOverDfsAppendOverBlob() throws IOException {
@Test
public void testCreateOverBlobAppendOverDfs() throws IOException {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
assumeDfsServiceType();
Configuration conf = getRawConfiguration();
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
Expand Down Expand Up @@ -292,6 +413,7 @@ public void testCreateOverBlobAppendOverDfs() throws IOException {
@Test
public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
throws IOException, NoSuchFieldException, IllegalAccessException {
assumeDfsServiceType();
Configuration conf = getRawConfiguration();
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
Expand Down Expand Up @@ -971,9 +1093,8 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep
new Random().nextBytes(bytes);
// Write some bytes and attempt to flush, which should retry
out.write(bytes);
List<String> list = new ArrayList<>();
list.add(generateBlockId(out, 0));
String blockListXml = generateBlockListXml(list);
String blockId = generateBlockId(out, 0);
String blockListXml = generateBlockListXml(blockId);

Mockito.doAnswer(answer -> {
// Set up the mock for the flush operation
Expand Down Expand Up @@ -1069,9 +1190,8 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc
new Random().nextBytes(bytes);
// Write some bytes and attempt to flush, which should retry
out.write(bytes);
List<String> list = new ArrayList<>();
list.add(generateBlockId(out, 0));
String blockListXml = generateBlockListXml(list);
String blockId = generateBlockId(out, 0);
String blockListXml = generateBlockListXml(blockId);

Mockito.doAnswer(answer -> {
// Set up the mock for the flush operation
Expand Down
Loading