From ec42027ca4823b2ad355318762c103cf06786b30 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Mon, 17 Feb 2025 05:55:09 -0800 Subject: [PATCH 01/10] make changes in list --- .../fs/azurebfs/services/AbfsBlobClient.java | 12 +++++--- .../services/AzureBlobBlockManager.java | 21 +++++++++----- .../fs/azurebfs/services/RenameAtomicity.java | 4 +-- .../azurebfs/ITestAbfsCustomEncryption.java | 3 +- .../ITestAzureBlobFileSystemAppend.java | 26 +++++++++++++---- ...ITestAzureBlobFileSystemInitAndCreate.java | 28 +++++++++++++++++-- 6 files changed, 71 insertions(+), 23 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index b54ce1a4dac7e..48d88044eb4b3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -2010,18 +2010,22 @@ private boolean isEmptyListResults(AbfsHttpOperation result) { } /** - * Generates an XML string representing the block list. + * Generate the XML block list using a comma-separated string of block IDs. * - * @param blockIds the set of block IDs - * @return the generated XML string + * @param blockIdString The comma-separated block IDs. + * @return the XML representation of the block list. */ - public static String generateBlockListXml(List blockIds) { + public static String generateBlockListXml(String blockIdString) { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(String.format(XML_VERSION)); stringBuilder.append(String.format(BLOCK_LIST_START_TAG)); + + // Split the block ID string by commas and generate XML for each block ID + String[] blockIds = blockIdString.split(","); for (String blockId : blockIds) { stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId)); } + stringBuilder.append(String.format(BLOCK_LIST_END_TAG)); return stringBuilder.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java index 519e5326c7d4c..68b9914e86548 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java @@ -38,9 +38,8 @@ public class AzureBlobBlockManager extends AzureBlockManager { private static final Logger LOG = LoggerFactory.getLogger( AbfsOutputStream.class); - - /** The list of already committed blocks is stored in this list. */ - private List committedBlockEntries = new ArrayList<>(); + /** Cached list of committed block IDs */ + private final StringBuilder committedBlockEntries = new StringBuilder(); /** The list to store blockId, position, and status. */ private final LinkedList blockEntryList = new LinkedList<>(); @@ -60,7 +59,10 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream, throws AzureBlobFileSystemException { super(abfsOutputStream, blockFactory, bufferSize); if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) { - this.committedBlockEntries = getBlockList(abfsOutputStream.getTracingContext()); + List committedBlocks = getBlockList(abfsOutputStream.getTracingContext()); + if (!committedBlocks.isEmpty()) { + committedBlockEntries.append(String.join(",", committedBlocks)).append(","); + } } LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}", abfsOutputStream.getStreamID(), abfsOutputStream.getPath()); @@ -151,6 +153,7 @@ protected synchronized boolean hasListToCommit() throws IOException { if (blockEntryList.isEmpty()) { return false; // No entries to commit } + while (!blockEntryList.isEmpty()) { BlockEntry current = blockEntryList.poll(); if (current.getStatus() != AbfsBlockStatus.SUCCESS) { @@ -177,7 +180,11 @@ protected synchronized boolean hasListToCommit() throws IOException { throw new IOException(errorMessage); } } - committedBlockEntries.add(current.getBlockId()); + // Append the current block's ID to the committedBlockBuilder + if (committedBlockEntries.length() > 0) { + committedBlockEntries.append(","); + } + committedBlockEntries.append(current.getBlockId()); LOG.debug("Block {} added to committed entries.", current.getBlockId()); } return true; @@ -188,7 +195,7 @@ protected synchronized boolean hasListToCommit() throws IOException { * * @return the block ID list */ - protected List getBlockIdList() { - return committedBlockEntries; + protected String getBlockIdList() { + return committedBlockEntries.toString(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index f8dab188f37eb..326875f24da65 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -221,8 +221,8 @@ void createRenamePendingJson(Path path, byte[] bytes) abfsClient.append(path.toUri().getPath(), bytes, appendRequestParameters, null, null, tracingContext); - List blockIdList = new ArrayList<>(Collections.singleton(blockId)); - String blockList = generateBlockListXml(blockIdList); + //List blockIdList = new ArrayList<>(Collections.singleton(blockId)); + String blockList = generateBlockListXml(blockId); // PutBlockList on the path. abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8), path.toUri().getPath(), true, null, null, eTag, null, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java index ee88ebccf6d0f..2a06dfb4acc2f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java @@ -64,6 +64,7 @@ import org.apache.hadoop.util.Lists; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA; @@ -320,7 +321,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs, return ingressClient.flush(path, 3, false, false, null, null, encryptionAdapter, getTestTracingContext(fs, false)); } else { - byte[] buffer = generateBlockListXml(new ArrayList<>()).getBytes(StandardCharsets.UTF_8); + byte[] buffer = generateBlockListXml(EMPTY_STRING).getBytes(StandardCharsets.UTF_8); return ingressClient.flush(buffer, path, false, null, null, null, encryptionAdapter, getTestTracingContext(fs, false)); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 73a0d01105753..5d30ea744405a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -492,6 +492,22 @@ public void testRecreateAppendAndFlush() throws IOException { } } + @Test + public void testFlush() throws IOException { + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + final AzureBlobFileSystem fs = getFileSystem(); + final Path filePath = path(TEST_FILE_PATH); + fs.create(filePath); + Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB); + FSDataOutputStream outputStream = fs.append(filePath); + outputStream.write(TEN); + outputStream.write(20); + outputStream.hsync(); + outputStream.write(30); + outputStream.write(40); + outputStream.close(); + } + /** * Recreate directory between append and flush. Etag mismatch happens. **/ @@ -971,9 +987,8 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep new Random().nextBytes(bytes); // Write some bytes and attempt to flush, which should retry out.write(bytes); - List list = new ArrayList<>(); - list.add(generateBlockId(out, 0)); - String blockListXml = generateBlockListXml(list); + String blockId = generateBlockId(out, 0); + String blockListXml = generateBlockListXml(blockId); Mockito.doAnswer(answer -> { // Set up the mock for the flush operation @@ -1069,9 +1084,8 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc new Random().nextBytes(bytes); // Write some bytes and attempt to flush, which should retry out.write(bytes); - List list = new ArrayList<>(); - list.add(generateBlockId(out, 0)); - String blockListXml = generateBlockListXml(list); + String blockId = generateBlockId(out, 0); + String blockListXml = generateBlockListXml(blockId); Mockito.doAnswer(answer -> { // Set up the mock for the flush operation diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index a0796c1b36fd6..8ac84ef8745f4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,10 +28,10 @@ import org.junit.Test; import org.mockito.Mockito; -import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; @@ -38,7 +39,12 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; @@ -71,8 +77,8 @@ public void testGetAclCallOnHnsConfigAbsence() throws Exception { AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance( getRawConfiguration())); AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient(AbfsServiceType.DFS)); - Mockito.doReturn(client).when(store).getClient(AbfsServiceType.DFS); + AbfsClient client = Mockito.spy(fs.getAbfsClient()); + Mockito.doReturn(client).when(store).getClient(); Mockito.doThrow(TrileanConversionException.class) .when(store) @@ -108,6 +114,22 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { .getAclStatus(Mockito.anyString(), any(TracingContext.class)); } + // TODO: [FnsOverBlob][HADOOP-19179] Remove this test case once Blob Endpoint Support is enabled. + @Test + public void testFileSystemInitFailsWithBlobEndpointUrl() throws Exception { + Configuration configuration = new Configuration(getRawConfiguration()); + String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY); + String accountKey = configuration.get( + accountProperty(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, getAccountName()), + configuration.get(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME)); + configuration.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, + accountKey.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); + String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME); + intercept(InvalidConfigurationValueException.class, + "Blob Endpoint Support not yet available", () -> + FileSystem.newInstance(new Path(blobUri).toUri(), configuration)); + } + @Test public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exception { AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance( From ccc7c1e95805ad25892d02eb6fe3b58322041a32 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Tue, 18 Feb 2025 22:38:53 -0800 Subject: [PATCH 02/10] optimizations --- .../azurebfs/services/AbfsOutputStream.java | 4 +- .../services/AzureBlobBlockManager.java | 6 ++ .../services/AzureBlobIngressHandler.java | 3 +- .../azurebfs/services/AzureBlockManager.java | 8 +++ .../services/AzureDFSBlockManager.java | 6 ++ .../ITestAzureBlobFileSystemAppend.java | 55 +++++++++++++++++++ 6 files changed, 78 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 8979b963d68c4..7e74c8daaec5b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -708,9 +708,7 @@ public synchronized void close() throws IOException { bufferIndex = 0; closed = true; writeOperations.clear(); - if (getBlockManager().hasActiveBlock()) { - getBlockManager().clearActiveBlock(); - } + getBlockManager().close(); } LOG.debug("Closing AbfsOutputStream : {}", this); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java index 68b9914e86548..f272fd528a024 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java @@ -198,4 +198,10 @@ protected synchronized boolean hasListToCommit() throws IOException { protected String getBlockIdList() { return committedBlockEntries.toString(); } + + @Override + public void close(){ + super.close(); + committedBlockEntries.setLength(0); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java index 0336cd04b640f..15e8fe271b968 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java @@ -297,7 +297,8 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException { activeBlock, reqParams, new TracingContext(getAbfsOutputStream().getTracingContext())); } catch (InvalidIngressServiceException ex) { - LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath()); + LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", + getAbfsOutputStream().getPath()); getAbfsOutputStream().switchHandler(); op = getAbfsOutputStream().getIngressHandler() .remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java index 5c4db6368b311..426e0c8194f8a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java @@ -167,4 +167,12 @@ void clearActiveBlock() { activeBlock = null; } } + + // Used to clear any resources used by the block manager. + void close() { + if (hasActiveBlock()) { + clearActiveBlock(); + } + LOG.debug("AzureBlockManager closed."); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java index 85b07dbc7387c..f7a4542c3f2ae 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java @@ -86,4 +86,10 @@ protected synchronized AbfsBlock getActiveBlock() { protected synchronized boolean hasActiveBlock() { return super.hasActiveBlock(); } + + @Override + public void close() { + super.close(); + LOG.debug("AzureDFSBlockManager closed."); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 5d30ea744405a..303923582a085 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -243,6 +243,60 @@ public void testCreateOverDfsAppendOverBlob() throws IOException { .isInstanceOf(AbfsDfsClient.class); } + @Test + public void testMultipleAppendSwitches() throws Exception { + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + final AzureBlobFileSystem fs = getFileSystem(); + Path testPath = path(TEST_FILE_PATH); + AzureBlobFileSystemStore.Permissions permissions + = new AzureBlobFileSystemStore.Permissions(false, + FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + fs.getAbfsStore().getClientHandler().getDfsClient(). + createPath(makeQualified(testPath).toUri().getPath(), true, false, + permissions, false, null, + null, getTestTracingContext(fs, true)); + fs.getAbfsStore() + .getAbfsConfiguration() + .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List> futures = new ArrayList<>(); + + // Create three output streams with different content length + final byte[] b1 = new byte[8 * ONE_MB]; + new Random().nextBytes(b1); + + FSDataOutputStream out1 = fs.append(testPath); + FSDataOutputStream out2 = fs.append(testPath); + FSDataOutputStream out3 = fs.append(testPath); + + // Submit tasks to write to each output stream + futures.add(executorService.submit(() -> { + try { + out1.write(b1, TEN, 2 * HUNDRED); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + futures.add(executorService.submit(() -> { + try { + out2.write(b1, TWENTY, 3 * HUNDRED); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + futures.add(executorService.submit(() -> { + try { + out3.write(b1, THIRTY, 4 * HUNDRED); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + checkFuturesForExceptions(futures, 0); + } + + /** * Creates a file over Blob and attempts to append over DFS. * It should fallback to Blob when appending to the file fails. @@ -383,6 +437,7 @@ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() } + /** * Tests the correct retrieval of the AzureIngressHandler based on the configured ingress service type. * From ea95cc21d7a807816f695698db10f3312d3159c7 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Fri, 21 Feb 2025 03:09:46 -0800 Subject: [PATCH 03/10] Append optimzations --- .../services/AzureBlobBlockManager.java | 4 +- .../services/AzureBlobIngressHandler.java | 4 +- .../AzureDfsToBlobIngressFallbackHandler.java | 2 +- .../services/AzureIngressHandler.java | 2 +- .../ITestAzureBlobFileSystemAppend.java | 180 ++++++++++++------ 5 files changed, 129 insertions(+), 63 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java index f272fd528a024..61a20d94641f5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java @@ -148,7 +148,7 @@ protected synchronized void updateEntry(AbfsBlock block) { * @return whether we have some data to commit or not. * @throws IOException if an I/O error occurs */ - protected synchronized boolean hasListToCommit() throws IOException { + protected synchronized boolean hasBlocksToCommit() throws IOException { // Adds all the committed blocks if available to the list of blocks to be added in putBlockList. if (blockEntryList.isEmpty()) { return false; // No entries to commit @@ -195,7 +195,7 @@ protected synchronized boolean hasListToCommit() throws IOException { * * @return the block ID list */ - protected String getBlockIdList() { + protected String getBlockIdToCommit() { return committedBlockEntries.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java index 15e8fe271b968..150d85d474a03 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java @@ -168,13 +168,13 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, if (getAbfsOutputStream().isAppendBlob()) { return null; } - if (!blobBlockManager.hasListToCommit()) { + if (!blobBlockManager.hasBlocksToCommit()) { return null; } try { // Generate the xml with the list of blockId's to generate putBlockList call. String blockListXml = generateBlockListXml( - blobBlockManager.getBlockIdList()); + blobBlockManager.getBlockIdToCommit()); TracingContext tracingContextFlush = new TracingContext(tracingContext); tracingContextFlush.setIngressHandler(BLOB_FLUSH); tracingContextFlush.setPosition(String.valueOf(offset)); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java index c9aefc56eb2c6..ba842cbb79b62 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java @@ -147,7 +147,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, final String leaseId, TracingContext tracingContext) throws IOException { AbfsRestOperation op; - if (!blobBlockManager.hasListToCommit()) { + if (!blobBlockManager.hasBlocksToCommit()) { return null; } try { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java index 3072bdf5d04d6..f7bc028aeb4e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java @@ -198,7 +198,7 @@ protected InvalidIngressServiceException getIngressHandlerSwitchException( * * @return the block manager */ - protected abstract AzureBlockManager getBlockManager(); + public abstract AzureBlockManager getBlockManager(); /** * Gets the client associated with this handler. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 303923582a085..8688c86f9a6da 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AzureBlobBlockManager; import org.apache.hadoop.fs.azurebfs.services.AzureBlobIngressHandler; import org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler; import org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler; @@ -87,6 +88,11 @@ import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Test append operations. @@ -95,6 +101,7 @@ public class ITestAzureBlobFileSystemAppend extends AbstractAbfsIntegrationTest { private static final String TEST_FILE_PATH = "testfile"; + private static final String TEST_FILE_PATH1 = "testfile1"; private static final String TEST_FOLDER_PATH = "testFolder"; @@ -165,16 +172,16 @@ public void testCloseOfDataBlockOnAppendComplete() throws Exception { for (String blockBufferType : blockBufferTypes) { Configuration configuration = new Configuration(getRawConfiguration()); configuration.set(DATA_BLOCKS_BUFFER, blockBufferType); - try (AzureBlobFileSystem fs = Mockito.spy( + try (AzureBlobFileSystem fs = spy( (AzureBlobFileSystem) FileSystem.newInstance(configuration))) { - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - Mockito.doReturn(store).when(fs).getAbfsStore(); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + doReturn(store).when(fs).getAbfsStore(); DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; Mockito.doAnswer(getBlobFactoryInvocation -> { - DataBlocks.BlockFactory factory = Mockito.spy( + DataBlocks.BlockFactory factory = spy( (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); Mockito.doAnswer(factoryCreateInvocation -> { - dataBlock[0] = Mockito.spy( + dataBlock[0] = spy( (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); return dataBlock[0]; }) @@ -272,7 +279,8 @@ public void testMultipleAppendSwitches() throws Exception { // Submit tasks to write to each output stream futures.add(executorService.submit(() -> { try { - out1.write(b1, TEN, 2 * HUNDRED); + out1.write(TEN); + out1.hsync(); } catch (IOException e) { throw new RuntimeException(e); } @@ -280,7 +288,8 @@ public void testMultipleAppendSwitches() throws Exception { futures.add(executorService.submit(() -> { try { - out2.write(b1, TWENTY, 3 * HUNDRED); + out2.write(TWENTY); + out2.hsync(); } catch (IOException e) { throw new RuntimeException(e); } @@ -288,7 +297,64 @@ public void testMultipleAppendSwitches() throws Exception { futures.add(executorService.submit(() -> { try { - out3.write(b1, THIRTY, 4 * HUNDRED); + out3.write(THIRTY); + out3.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + checkFuturesForExceptions(futures, 0); + AzureIngressHandler ingressHandlerFallback + = ((AbfsOutputStream) out1.getWrappedStream()).getIngressHandler(); + AbfsClient clientFallback = ingressHandlerFallback.getClient(); + Assertions.assertThat(clientFallback) + .as("DFS client was not used after fallback") + .isInstanceOf(AbfsDfsClient.class); + } + + @Test + public void testParallelDfsBlob() throws Exception { + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + final AzureBlobFileSystem fs = getFileSystem(); + Path testPath = path(TEST_FILE_PATH); + Path testPath1 = path(TEST_FILE_PATH1); + AzureBlobFileSystemStore.Permissions permissions + = new AzureBlobFileSystemStore.Permissions(false, + FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + fs.getAbfsStore().getClientHandler().getDfsClient(). + createPath(makeQualified(testPath).toUri().getPath(), true, false, + permissions, false, null, + null, getTestTracingContext(fs, true)); + fs.getAbfsStore() + .getAbfsConfiguration() + .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + FSDataOutputStream out1 = fs.create(testPath); + fs.getAbfsStore().getClientHandler().getDfsClient(). + createPath(makeQualified(testPath1).toUri().getPath(), true, false, + permissions, false, null, + null, getTestTracingContext(fs, true)); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List> futures = new ArrayList<>(); + + // Create three output streams with different content length + final byte[] b1 = new byte[8 * ONE_MB]; + new Random().nextBytes(b1); + FSDataOutputStream out2 = fs.append(testPath1); + + // Submit tasks to write to each output stream + futures.add(executorService.submit(() -> { + try { + out1.write(TEN); + out1.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + futures.add(executorService.submit(() -> { + try { + out2.write(TWENTY); + out2.hsync(); } catch (IOException e) { throw new RuntimeException(e); } @@ -350,10 +416,10 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); conf.set(FS_AZURE_INGRESS_SERVICE_TYPE, String.valueOf(AbfsServiceType.DFS)); - try (AzureBlobFileSystem fs = Mockito.spy( + try (AzureBlobFileSystem fs = spy( (AzureBlobFileSystem) FileSystem.newInstance(conf))) { - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - Mockito.doReturn(true).when(store).isAppendBlobKey(anyString()); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + doReturn(true).when(store).isAppendBlobKey(anyString()); // Set abfsStore as our mocked value. Field privateField = AzureBlobFileSystem.class.getDeclaredField( @@ -395,9 +461,9 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() throws IOException, NoSuchFieldException, IllegalAccessException { assumeHnsEnabled("FNS does not support append blob creation for DFS endpoint"); - final AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - Mockito.doReturn(true).when(store).isAppendBlobKey(anyString()); + final AzureBlobFileSystem fs = spy(getFileSystem()); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + doReturn(true).when(store).isAppendBlobKey(anyString()); // Set abfsStore as our mocked value. Field privateField = AzureBlobFileSystem.class.getDeclaredField( @@ -771,7 +837,7 @@ public void testEtagMismatch() throws Exception { public void testAppendWithLease() throws Exception { final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE_PATH); - final AzureBlobFileSystem fs = Mockito.spy( + final AzureBlobFileSystem fs = spy( getCustomFileSystem(testFilePath.getParent(), 1)); FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); @@ -813,18 +879,18 @@ public void testAppendImplicitDirectoryAzcopy() throws Exception { @Test public void testIntermittentAppendFailureToBeReported() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); - try (AzureBlobFileSystem fs = Mockito.spy( + try (AzureBlobFileSystem fs = spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { assumeHnsDisabled(); - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); assumeBlobServiceType(); - AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); - AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); + AbfsClientHandler clientHandler = spy(store.getClientHandler()); + AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); - Mockito.doReturn(clientHandler).when(store).getClientHandler(); - Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); - Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); + doReturn(clientHandler).when(store).getClientHandler(); + doReturn(blobClient).when(clientHandler).getBlobClient(); + doReturn(blobClient).when(clientHandler).getIngressClient(); Mockito.doThrow( new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception())) @@ -893,14 +959,14 @@ public void testIntermittentAppendFailureToBeReported() throws Exception { private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs, Path path, AbfsClient client) throws IOException { - AbfsOutputStream abfsOutputStream = Mockito.spy( + AbfsOutputStream abfsOutputStream = spy( (AbfsOutputStream) fs.create(path).getWrappedStream()); - AzureIngressHandler ingressHandler = Mockito.spy( + AzureIngressHandler ingressHandler = spy( abfsOutputStream.getIngressHandler()); - Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler(); - Mockito.doReturn(client).when(ingressHandler).getClient(); + doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler(); + doReturn(client).when(ingressHandler).getClient(); - FSDataOutputStream fsDataOutputStream = Mockito.spy( + FSDataOutputStream fsDataOutputStream = spy( new FSDataOutputStream(abfsOutputStream, null)); return fsDataOutputStream; } @@ -913,22 +979,22 @@ private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs, @Test public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); - try (AzureBlobFileSystem fs = Mockito.spy( + try (AzureBlobFileSystem fs = spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); - AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); - AbfsDfsClient dfsClient = Mockito.spy(clientHandler.getDfsClient()); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + AbfsClientHandler clientHandler = spy(store.getClientHandler()); + AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); + AbfsDfsClient dfsClient = spy(clientHandler.getDfsClient()); AbfsClient client = clientHandler.getIngressClient(); if (clientHandler.getIngressClient() instanceof AbfsBlobClient) { - Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); - Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); + doReturn(blobClient).when(clientHandler).getBlobClient(); + doReturn(blobClient).when(clientHandler).getIngressClient(); } else { - Mockito.doReturn(dfsClient).when(clientHandler).getDfsClient(); - Mockito.doReturn(dfsClient).when(clientHandler).getIngressClient(); + doReturn(dfsClient).when(clientHandler).getDfsClient(); + doReturn(dfsClient).when(clientHandler).getIngressClient(); } - Mockito.doReturn(clientHandler).when(store).getClientHandler(); + doReturn(clientHandler).when(store).getClientHandler(); byte[] bytes = new byte[1024 * 1024 * 8]; new Random().nextBytes(bytes); @@ -1018,21 +1084,21 @@ private String generateBlockId(AbfsOutputStream os, long position) { public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); // Create a spy of AzureBlobFileSystem - try (AzureBlobFileSystem fs = Mockito.spy( + try (AzureBlobFileSystem fs = spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { assumeHnsDisabled(); // Create a spy of AzureBlobFileSystemStore - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); assumeBlobServiceType(); // Create spies for the client handler and blob client - AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); - AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); + AbfsClientHandler clientHandler = spy(store.getClientHandler()); + AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); // Set up the spies to return the mocked objects - Mockito.doReturn(clientHandler).when(store).getClientHandler(); - Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); - Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); + doReturn(clientHandler).when(store).getClientHandler(); + doReturn(blobClient).when(clientHandler).getBlobClient(); + doReturn(blobClient).when(clientHandler).getIngressClient(); AtomicInteger flushCount = new AtomicInteger(0); FSDataOutputStream os = createMockedOutputStream(fs, new Path("/test/file"), blobClient); @@ -1056,10 +1122,10 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep int currentCount = flushCount.incrementAndGet(); if (currentCount == 1) { - Mockito.when(httpOperation.getStatusCode()) + when(httpOperation.getStatusCode()) .thenReturn( HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error - Mockito.when(httpOperation.getStorageErrorMessage()) + when(httpOperation.getStorageErrorMessage()) .thenReturn("CONNECTION_RESET"); // Error message throw new IOException("Connection Reset"); } @@ -1114,22 +1180,22 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); // Create a spy of AzureBlobFileSystem - try (AzureBlobFileSystem fs = Mockito.spy( + try (AzureBlobFileSystem fs = spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { assumeHnsDisabled(); // Create a spy of AzureBlobFileSystemStore - AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); assumeBlobServiceType(); // Create spies for the client handler and blob client - AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); - AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); + AbfsClientHandler clientHandler = spy(store.getClientHandler()); + AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); // Set up the spies to return the mocked objects - Mockito.doReturn(clientHandler).when(store).getClientHandler(); - Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); - Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); + doReturn(clientHandler).when(store).getClientHandler(); + doReturn(blobClient).when(clientHandler).getBlobClient(); + doReturn(blobClient).when(clientHandler).getIngressClient(); AtomicInteger flushCount = new AtomicInteger(0); FSDataOutputStream os = createMockedOutputStream(fs, new Path("/test/file"), blobClient); @@ -1153,16 +1219,16 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc int currentCount = flushCount.incrementAndGet(); if (currentCount == 1) { - Mockito.when(httpOperation.getStatusCode()) + when(httpOperation.getStatusCode()) .thenReturn( HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error - Mockito.when(httpOperation.getStorageErrorMessage()) + when(httpOperation.getStorageErrorMessage()) .thenReturn("CONNECTION_RESET"); // Error message throw new IOException("Connection Reset"); } else if (currentCount == 2) { - Mockito.when(httpOperation.getStatusCode()) + when(httpOperation.getStatusCode()) .thenReturn(HTTP_OK); - Mockito.when(httpOperation.getStorageErrorMessage()) + when(httpOperation.getStorageErrorMessage()) .thenReturn("HTTP_OK"); } return null; From 41414eaad13621dc5551de768723150db730682b Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Fri, 21 Feb 2025 03:11:15 -0800 Subject: [PATCH 04/10] remove unused imports --- .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 8688c86f9a6da..452dde87ea642 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -56,7 +56,6 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil; import org.apache.hadoop.fs.azurebfs.services.AbfsDfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; -import org.apache.hadoop.fs.azurebfs.services.AzureBlobBlockManager; import org.apache.hadoop.fs.azurebfs.services.AzureBlobIngressHandler; import org.apache.hadoop.fs.azurebfs.services.AzureDFSIngressHandler; import org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler; @@ -88,10 +87,8 @@ import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** From f932c8d74057a8c178b1e255cc2fec497610b011 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Fri, 21 Feb 2025 03:53:54 -0800 Subject: [PATCH 05/10] Add test for invalid ingress --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 4 ++++ .../ITestAzureBlobFileSystemInitAndCreate.java | 18 +++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e0cb36201065d..2359745ddb9eb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -556,6 +556,10 @@ public void validateConfiguredServiceType(boolean isHNSEnabled) } else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) { throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, "Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account"); + } else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB + && getIngressServiceType() == AbfsServiceType.DFS) { + throw new InvalidConfigurationValueException( + FS_AZURE_INGRESS_SERVICE_TYPE, "Ingress Type Cannot be DFS for Blob endpoint configured filesystem"); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index 8ac84ef8745f4..9abd283c9d3fb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.mockito.Mockito; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -42,6 +43,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_SERVICE_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; @@ -114,9 +116,18 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { .getAclStatus(Mockito.anyString(), any(TracingContext.class)); } - // TODO: [FnsOverBlob][HADOOP-19179] Remove this test case once Blob Endpoint Support is enabled. + /** + * Test to verify that the initialization of the AzureBlobFileSystem fails + * when an invalid ingress service type is configured. + * + * This test sets up a configuration with an invalid ingress service type + * (DFS) for a Blob endpoint and expects an InvalidConfigurationValueException + * to be thrown during the initialization of the filesystem. + * + * @throws Exception if an error occurs during the test execution + */ @Test - public void testFileSystemInitFailsWithBlobEndpointUrl() throws Exception { + public void testFileSystemInitializationFailsForInvalidIngress() throws Exception { Configuration configuration = new Configuration(getRawConfiguration()); String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY); String accountKey = configuration.get( @@ -124,9 +135,10 @@ public void testFileSystemInitFailsWithBlobEndpointUrl() throws Exception { configuration.get(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME)); configuration.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, accountKey.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); + configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name()); String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME); intercept(InvalidConfigurationValueException.class, - "Blob Endpoint Support not yet available", () -> + "Ingress Type Cannot be DFS for Blob endpoint configured filesystem", () -> FileSystem.newInstance(new Path(blobUri).toUri(), configuration)); } From 6039e0e78db5f7428899066b00da992623adf9e3 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Fri, 21 Feb 2025 04:10:37 -0800 Subject: [PATCH 06/10] Add javadocs --- .../services/AzureBlobBlockManager.java | 6 +- .../services/AzureIngressHandler.java | 2 +- .../fs/azurebfs/services/RenameAtomicity.java | 3 +- .../ITestAzureBlobFileSystemAppend.java | 138 ++++++++---------- ...ITestAzureBlobFileSystemInitAndCreate.java | 4 +- 5 files changed, 70 insertions(+), 83 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java index 61a20d94641f5..c72bf721dc6f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.store.DataBlocks; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA; + /** * Manages Azure Blob blocks for append operations. */ @@ -61,7 +63,7 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream, if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) { List committedBlocks = getBlockList(abfsOutputStream.getTracingContext()); if (!committedBlocks.isEmpty()) { - committedBlockEntries.append(String.join(",", committedBlocks)).append(","); + committedBlockEntries.append(String.join(COMMA, committedBlocks)); } } LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}", @@ -182,7 +184,7 @@ protected synchronized boolean hasBlocksToCommit() throws IOException { } // Append the current block's ID to the committedBlockBuilder if (committedBlockEntries.length() > 0) { - committedBlockEntries.append(","); + committedBlockEntries.append(COMMA); } committedBlockEntries.append(current.getBlockId()); LOG.debug("Block {} added to committed entries.", current.getBlockId()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java index f7bc028aeb4e0..3072bdf5d04d6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java @@ -198,7 +198,7 @@ protected InvalidIngressServiceException getIngressHandlerSwitchException( * * @return the block manager */ - public abstract AzureBlockManager getBlockManager(); + protected abstract AzureBlockManager getBlockManager(); /** * Gets the client associated with this handler. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index 326875f24da65..a67f71ebc1534 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -221,8 +221,7 @@ void createRenamePendingJson(Path path, byte[] bytes) abfsClient.append(path.toUri().getPath(), bytes, appendRequestParameters, null, null, tracingContext); - //List blockIdList = new ArrayList<>(Collections.singleton(blockId)); - String blockList = generateBlockListXml(blockId); + String blockList = generateBlockListXml(blockId); // PutBlockList on the path. abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8), path.toUri().getPath(), true, null, null, eTag, null, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 452dde87ea642..237be117d7d50 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -87,9 +87,6 @@ import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; /** * Test append operations. @@ -169,16 +166,16 @@ public void testCloseOfDataBlockOnAppendComplete() throws Exception { for (String blockBufferType : blockBufferTypes) { Configuration configuration = new Configuration(getRawConfiguration()); configuration.set(DATA_BLOCKS_BUFFER, blockBufferType); - try (AzureBlobFileSystem fs = spy( + try (AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(configuration))) { - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); - doReturn(store).when(fs).getAbfsStore(); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; Mockito.doAnswer(getBlobFactoryInvocation -> { - DataBlocks.BlockFactory factory = spy( + DataBlocks.BlockFactory factory = Mockito.spy( (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); Mockito.doAnswer(factoryCreateInvocation -> { - dataBlock[0] = spy( + dataBlock[0] = Mockito.spy( (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); return dataBlock[0]; }) @@ -247,8 +244,11 @@ public void testCreateOverDfsAppendOverBlob() throws IOException { .isInstanceOf(AbfsDfsClient.class); } + /** + * This test verifies that if multiple appends qualify for switch, no appends should fail. + */ @Test - public void testMultipleAppendSwitches() throws Exception { + public void testMultipleAppendsQualifyForSwitch() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); final AzureBlobFileSystem fs = getFileSystem(); Path testPath = path(TEST_FILE_PATH); @@ -309,8 +309,11 @@ public void testMultipleAppendSwitches() throws Exception { .isInstanceOf(AbfsDfsClient.class); } + /** + * This test verifies that parallel writes on dfs and blob endpoint should not fail. + */ @Test - public void testParallelDfsBlob() throws Exception { + public void testParallelWritesOnDfsAndBlob() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); final AzureBlobFileSystem fs = getFileSystem(); Path testPath = path(TEST_FILE_PATH); @@ -413,10 +416,10 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); conf.set(FS_AZURE_INGRESS_SERVICE_TYPE, String.valueOf(AbfsServiceType.DFS)); - try (AzureBlobFileSystem fs = spy( + try (AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(conf))) { - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); - doReturn(true).when(store).isAppendBlobKey(anyString()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(true).when(store).isAppendBlobKey(anyString()); // Set abfsStore as our mocked value. Field privateField = AzureBlobFileSystem.class.getDeclaredField( @@ -458,9 +461,9 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() throws IOException, NoSuchFieldException, IllegalAccessException { assumeHnsEnabled("FNS does not support append blob creation for DFS endpoint"); - final AzureBlobFileSystem fs = spy(getFileSystem()); - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); - doReturn(true).when(store).isAppendBlobKey(anyString()); + final AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(true).when(store).isAppendBlobKey(anyString()); // Set abfsStore as our mocked value. Field privateField = AzureBlobFileSystem.class.getDeclaredField( @@ -500,7 +503,6 @@ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() } - /** * Tests the correct retrieval of the AzureIngressHandler based on the configured ingress service type. * @@ -610,22 +612,6 @@ public void testRecreateAppendAndFlush() throws IOException { } } - @Test - public void testFlush() throws IOException { - Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); - final AzureBlobFileSystem fs = getFileSystem(); - final Path filePath = path(TEST_FILE_PATH); - fs.create(filePath); - Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB); - FSDataOutputStream outputStream = fs.append(filePath); - outputStream.write(TEN); - outputStream.write(20); - outputStream.hsync(); - outputStream.write(30); - outputStream.write(40); - outputStream.close(); - } - /** * Recreate directory between append and flush. Etag mismatch happens. **/ @@ -834,7 +820,7 @@ public void testEtagMismatch() throws Exception { public void testAppendWithLease() throws Exception { final Path testFilePath = new Path(path(methodName.getMethodName()), TEST_FILE_PATH); - final AzureBlobFileSystem fs = spy( + final AzureBlobFileSystem fs = Mockito.spy( getCustomFileSystem(testFilePath.getParent(), 1)); FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); @@ -876,18 +862,18 @@ public void testAppendImplicitDirectoryAzcopy() throws Exception { @Test public void testIntermittentAppendFailureToBeReported() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); - try (AzureBlobFileSystem fs = spy( + try (AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { assumeHnsDisabled(); - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); assumeBlobServiceType(); - AbfsClientHandler clientHandler = spy(store.getClientHandler()); - AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); - doReturn(clientHandler).when(store).getClientHandler(); - doReturn(blobClient).when(clientHandler).getBlobClient(); - doReturn(blobClient).when(clientHandler).getIngressClient(); + Mockito.doReturn(clientHandler).when(store).getClientHandler(); + Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); + Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); Mockito.doThrow( new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception())) @@ -956,14 +942,14 @@ public void testIntermittentAppendFailureToBeReported() throws Exception { private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs, Path path, AbfsClient client) throws IOException { - AbfsOutputStream abfsOutputStream = spy( + AbfsOutputStream abfsOutputStream = Mockito.spy( (AbfsOutputStream) fs.create(path).getWrappedStream()); - AzureIngressHandler ingressHandler = spy( + AzureIngressHandler ingressHandler = Mockito.spy( abfsOutputStream.getIngressHandler()); - doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler(); - doReturn(client).when(ingressHandler).getClient(); + Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler(); + Mockito.doReturn(client).when(ingressHandler).getClient(); - FSDataOutputStream fsDataOutputStream = spy( + FSDataOutputStream fsDataOutputStream = Mockito.spy( new FSDataOutputStream(abfsOutputStream, null)); return fsDataOutputStream; } @@ -976,22 +962,22 @@ private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs, @Test public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); - try (AzureBlobFileSystem fs = spy( + try (AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); - AbfsClientHandler clientHandler = spy(store.getClientHandler()); - AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); - AbfsDfsClient dfsClient = spy(clientHandler.getDfsClient()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); + AbfsDfsClient dfsClient = Mockito.spy(clientHandler.getDfsClient()); AbfsClient client = clientHandler.getIngressClient(); if (clientHandler.getIngressClient() instanceof AbfsBlobClient) { - doReturn(blobClient).when(clientHandler).getBlobClient(); - doReturn(blobClient).when(clientHandler).getIngressClient(); + Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); + Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); } else { - doReturn(dfsClient).when(clientHandler).getDfsClient(); - doReturn(dfsClient).when(clientHandler).getIngressClient(); + Mockito.doReturn(dfsClient).when(clientHandler).getDfsClient(); + Mockito.doReturn(dfsClient).when(clientHandler).getIngressClient(); } - doReturn(clientHandler).when(store).getClientHandler(); + Mockito.doReturn(clientHandler).when(store).getClientHandler(); byte[] bytes = new byte[1024 * 1024 * 8]; new Random().nextBytes(bytes); @@ -1081,21 +1067,21 @@ private String generateBlockId(AbfsOutputStream os, long position) { public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); // Create a spy of AzureBlobFileSystem - try (AzureBlobFileSystem fs = spy( + try (AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { assumeHnsDisabled(); // Create a spy of AzureBlobFileSystemStore - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); assumeBlobServiceType(); // Create spies for the client handler and blob client - AbfsClientHandler clientHandler = spy(store.getClientHandler()); - AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); // Set up the spies to return the mocked objects - doReturn(clientHandler).when(store).getClientHandler(); - doReturn(blobClient).when(clientHandler).getBlobClient(); - doReturn(blobClient).when(clientHandler).getIngressClient(); + Mockito.doReturn(clientHandler).when(store).getClientHandler(); + Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); + Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); AtomicInteger flushCount = new AtomicInteger(0); FSDataOutputStream os = createMockedOutputStream(fs, new Path("/test/file"), blobClient); @@ -1119,10 +1105,10 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep int currentCount = flushCount.incrementAndGet(); if (currentCount == 1) { - when(httpOperation.getStatusCode()) + Mockito.when(httpOperation.getStatusCode()) .thenReturn( HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error - when(httpOperation.getStorageErrorMessage()) + Mockito.when(httpOperation.getStorageErrorMessage()) .thenReturn("CONNECTION_RESET"); // Error message throw new IOException("Connection Reset"); } @@ -1177,22 +1163,22 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exception { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); // Create a spy of AzureBlobFileSystem - try (AzureBlobFileSystem fs = spy( + try (AzureBlobFileSystem fs = Mockito.spy( (AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) { assumeHnsDisabled(); // Create a spy of AzureBlobFileSystemStore - AzureBlobFileSystemStore store = spy(fs.getAbfsStore()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); assumeBlobServiceType(); // Create spies for the client handler and blob client - AbfsClientHandler clientHandler = spy(store.getClientHandler()); - AbfsBlobClient blobClient = spy(clientHandler.getBlobClient()); + AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler()); + AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient()); // Set up the spies to return the mocked objects - doReturn(clientHandler).when(store).getClientHandler(); - doReturn(blobClient).when(clientHandler).getBlobClient(); - doReturn(blobClient).when(clientHandler).getIngressClient(); + Mockito.doReturn(clientHandler).when(store).getClientHandler(); + Mockito.doReturn(blobClient).when(clientHandler).getBlobClient(); + Mockito.doReturn(blobClient).when(clientHandler).getIngressClient(); AtomicInteger flushCount = new AtomicInteger(0); FSDataOutputStream os = createMockedOutputStream(fs, new Path("/test/file"), blobClient); @@ -1216,16 +1202,16 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc int currentCount = flushCount.incrementAndGet(); if (currentCount == 1) { - when(httpOperation.getStatusCode()) + Mockito.when(httpOperation.getStatusCode()) .thenReturn( HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error - when(httpOperation.getStorageErrorMessage()) + Mockito.when(httpOperation.getStorageErrorMessage()) .thenReturn("CONNECTION_RESET"); // Error message throw new IOException("Connection Reset"); } else if (currentCount == 2) { - when(httpOperation.getStatusCode()) + Mockito.when(httpOperation.getStatusCode()) .thenReturn(HTTP_OK); - when(httpOperation.getStorageErrorMessage()) + Mockito.when(httpOperation.getStorageErrorMessage()) .thenReturn("HTTP_OK"); } return null; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index 9abd283c9d3fb..6e5e7f35d5e96 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -79,8 +79,8 @@ public void testGetAclCallOnHnsConfigAbsence() throws Exception { AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance( getRawConfiguration())); AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); - AbfsClient client = Mockito.spy(fs.getAbfsClient()); - Mockito.doReturn(client).when(store).getClient(); + AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient(AbfsServiceType.DFS)); + Mockito.doReturn(client).when(store).getClient(AbfsServiceType.DFS); Mockito.doThrow(TrileanConversionException.class) .when(store) From e126133398f168af5991c4f0153e286db00f58af Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Mon, 24 Feb 2025 04:19:00 -0800 Subject: [PATCH 07/10] Fix tests --- .../hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java | 2 ++ .../fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 237be117d7d50..920c4964a559f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -372,6 +372,7 @@ public void testParallelWritesOnDfsAndBlob() throws Exception { @Test public void testCreateOverBlobAppendOverDfs() throws IOException { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + assumeDfsServiceType(); Configuration conf = getRawConfiguration(); conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); conf.set(FS_AZURE_INGRESS_SERVICE_TYPE, @@ -412,6 +413,7 @@ public void testCreateOverBlobAppendOverDfs() throws IOException { @Test public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() throws IOException, NoSuchFieldException, IllegalAccessException { + assumeDfsServiceType(); Configuration conf = getRawConfiguration(); conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); conf.set(FS_AZURE_INGRESS_SERVICE_TYPE, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index 6e5e7f35d5e96..de54c1d5dc830 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -61,7 +61,8 @@ public ITestAzureBlobFileSystemInitAndCreate() throws Exception { } @Override - public void setup() { + public void setup() throws Exception { + super.setup(); } @Override @@ -128,6 +129,7 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { */ @Test public void testFileSystemInitializationFailsForInvalidIngress() throws Exception { + assumeHnsDisabled(); Configuration configuration = new Configuration(getRawConfiguration()); String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY); String accountKey = configuration.get( From fd992497f587b1061cfb1604b6ff626d4d6b657e Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Mon, 24 Feb 2025 04:20:02 -0800 Subject: [PATCH 08/10] checkstyle fixes --- .../apache/hadoop/fs/azurebfs/services/RenameAtomicity.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index a67f71ebc1534..42e8f6ed3aa72 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Random; import com.fasterxml.jackson.core.JsonProcessingException; From 002562b48795f49a0a17b6d0b3fbe3d316de7177 Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Mon, 24 Feb 2025 22:19:33 -0800 Subject: [PATCH 09/10] empty string handling --- .../hadoop/fs/azurebfs/services/AbfsBlobClient.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 48d88044eb4b3..8626dc2033911 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -2021,9 +2021,11 @@ public static String generateBlockListXml(String blockIdString) { stringBuilder.append(String.format(BLOCK_LIST_START_TAG)); // Split the block ID string by commas and generate XML for each block ID - String[] blockIds = blockIdString.split(","); - for (String blockId : blockIds) { - stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId)); + if (!blockIdString.isEmpty()) { + String[] blockIds = blockIdString.split(","); + for (String blockId : blockIds) { + stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId)); + } } stringBuilder.append(String.format(BLOCK_LIST_END_TAG)); From 29546cc23c90b86252cfc3ac5ed18afab35a0ead Mon Sep 17 00:00:00 2001 From: Anmol Asrani Date: Tue, 11 Mar 2025 02:46:24 -0700 Subject: [PATCH 10/10] review comments --- .../java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java | 3 ++- .../org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java | 1 + .../fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e78bb70805212..3e73e69c66aa3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -78,6 +78,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE; /** * Configuration for Azure Blob FileSystem. @@ -563,7 +564,7 @@ public void validateConfiguredServiceType(boolean isHNSEnabled) } else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB && getIngressServiceType() == AbfsServiceType.DFS) { throw new InvalidConfigurationValueException( - FS_AZURE_INGRESS_SERVICE_TYPE, "Ingress Type Cannot be DFS for Blob endpoint configured filesystem"); + FS_AZURE_INGRESS_SERVICE_TYPE, INCORRECT_INGRESS_TYPE); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index e75df046d8d6b..511848c4d0a50 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -73,5 +73,6 @@ public final class AbfsErrors { "Error while recovering from create failure."; public static final String ERR_RENAME_RECOVERY = "Error while recovering from rename failure."; + public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem."; private AbfsErrors() {} } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index de54c1d5dc830..35ce615ba738c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -47,6 +47,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; @@ -140,7 +141,7 @@ public void testFileSystemInitializationFailsForInvalidIngress() throws Exceptio configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name()); String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME); intercept(InvalidConfigurationValueException.class, - "Ingress Type Cannot be DFS for Blob endpoint configured filesystem", () -> + INCORRECT_INGRESS_TYPE, () -> FileSystem.newInstance(new Path(blobUri).toUri(), configuration)); }