diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 6a51f8d902038..3e73e69c66aa3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -78,6 +78,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE; /** * Configuration for Azure Blob FileSystem. @@ -560,6 +561,10 @@ public void validateConfiguredServiceType(boolean isHNSEnabled) } else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) { throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, "Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account"); + } else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB + && getIngressServiceType() == AbfsServiceType.DFS) { + throw new InvalidConfigurationValueException( + FS_AZURE_INGRESS_SERVICE_TYPE, INCORRECT_INGRESS_TYPE); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index f184ef5f7a9b7..11c6f5a07e759 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -2011,18 +2011,24 @@ private boolean isEmptyListResults(AbfsHttpOperation result) { } /** - * Generates an XML string representing the block list. + * Generate the XML block list using a comma-separated string of block IDs. * - * @param blockIds the set of block IDs - * @return the generated XML string + * @param blockIdString The comma-separated block IDs. + * @return the XML representation of the block list. */ - public static String generateBlockListXml(List<String> blockIds) { + public static String generateBlockListXml(String blockIdString) { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(String.format(XML_VERSION)); stringBuilder.append(String.format(BLOCK_LIST_START_TAG)); - for (String blockId : blockIds) { - stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId)); + + // Split the block ID string by commas and generate XML for each block ID + if (!blockIdString.isEmpty()) { + String[] blockIds = blockIdString.split(","); + for (String blockId : blockIds) { + stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId)); + } } + stringBuilder.append(String.format(BLOCK_LIST_END_TAG)); return stringBuilder.toString(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index e75df046d8d6b..511848c4d0a50 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -73,5 +73,6 @@ public final class AbfsErrors { "Error while recovering from create failure."; public static final String ERR_RENAME_RECOVERY = "Error while recovering from rename failure."; + public static final String INCORRECT_INGRESS_TYPE = "Ingress Type Cannot be DFS for Blob endpoint configured filesystem."; private AbfsErrors() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 8979b963d68c4..7e74c8daaec5b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -708,9 +708,7 @@ public synchronized void close() throws IOException { bufferIndex = 0; closed = true; writeOperations.clear(); - if (getBlockManager().hasActiveBlock()) { - getBlockManager().clearActiveBlock(); - } + getBlockManager().close(); } LOG.debug("Closing AbfsOutputStream : {}", this); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java index 519e5326c7d4c..c72bf721dc6f7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.store.DataBlocks; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA; + /** * Manages Azure Blob blocks for append operations. */ @@ -38,9 +40,8 @@ public class AzureBlobBlockManager extends AzureBlockManager { private static final Logger LOG = LoggerFactory.getLogger( AbfsOutputStream.class); - - /** The list of already committed blocks is stored in this list. */ - private List<String> committedBlockEntries = new ArrayList<>(); + /** Cached list of committed block IDs */ + private final StringBuilder committedBlockEntries = new StringBuilder(); /** The list to store blockId, position, and status. */ private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>(); @@ -60,7 +61,10 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream, throws AzureBlobFileSystemException { super(abfsOutputStream, blockFactory, bufferSize); if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) { - this.committedBlockEntries = getBlockList(abfsOutputStream.getTracingContext()); + List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext()); + if (!committedBlocks.isEmpty()) { + committedBlockEntries.append(String.join(COMMA, committedBlocks)); + } } LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}", abfsOutputStream.getStreamID(), abfsOutputStream.getPath()); @@ -146,11 +150,12 @@ protected synchronized void updateEntry(AbfsBlock block) { * @return whether we have some data to commit or not. * @throws IOException if an I/O error occurs */ - protected synchronized boolean hasListToCommit() throws IOException { + protected synchronized boolean hasBlocksToCommit() throws IOException { // Adds all the committed blocks if available to the list of blocks to be added in putBlockList. if (blockEntryList.isEmpty()) { return false; // No entries to commit } + while (!blockEntryList.isEmpty()) { BlockEntry current = blockEntryList.poll(); if (current.getStatus() != AbfsBlockStatus.SUCCESS) { @@ -177,7 +182,11 @@ protected synchronized boolean hasListToCommit() throws IOException { throw new IOException(errorMessage); } } - committedBlockEntries.add(current.getBlockId()); + // Append the current block's ID to the committedBlockBuilder + if (committedBlockEntries.length() > 0) { + committedBlockEntries.append(COMMA); + } + committedBlockEntries.append(current.getBlockId()); LOG.debug("Block {} added to committed entries.", current.getBlockId()); } return true; @@ -188,7 +197,13 @@ protected synchronized boolean hasListToCommit() throws IOException { * * @return the block ID list */ - protected List<String> getBlockIdList() { - return committedBlockEntries; + protected String getBlockIdToCommit() { + return committedBlockEntries.toString(); + } + + @Override + public void close(){ + super.close(); + committedBlockEntries.setLength(0); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java index 0336cd04b640f..150d85d474a03 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java @@ -168,13 +168,13 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, if (getAbfsOutputStream().isAppendBlob()) { return null; } - if (!blobBlockManager.hasListToCommit()) { + if (!blobBlockManager.hasBlocksToCommit()) { return null; } try { // Generate the xml with the list of blockId's to generate putBlockList call. String blockListXml = generateBlockListXml( - blobBlockManager.getBlockIdList()); + blobBlockManager.getBlockIdToCommit()); TracingContext tracingContextFlush = new TracingContext(tracingContext); tracingContextFlush.setIngressHandler(BLOB_FLUSH); tracingContextFlush.setPosition(String.valueOf(offset)); @@ -297,7 +297,8 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException { activeBlock, reqParams, new TracingContext(getAbfsOutputStream().getTracingContext())); } catch (InvalidIngressServiceException ex) { - LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", getAbfsOutputStream().getPath()); + LOG.debug("InvalidIngressServiceException caught for path: {}, switching handler and retrying remoteAppendBlobWrite.", + getAbfsOutputStream().getPath()); getAbfsOutputStream().switchHandler(); op = getAbfsOutputStream().getIngressHandler() .remoteAppendBlobWrite(getAbfsOutputStream().getPath(), uploadData, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java index 5c4db6368b311..426e0c8194f8a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java @@ -167,4 +167,12 @@ void clearActiveBlock() { activeBlock = null; } } + + // Used to clear any resources used by the block manager. + void close() { + if (hasActiveBlock()) { + clearActiveBlock(); + } + LOG.debug("AzureBlockManager closed."); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java index 85b07dbc7387c..f7a4542c3f2ae 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java @@ -86,4 +86,10 @@ protected synchronized AbfsBlock getActiveBlock() { protected synchronized boolean hasActiveBlock() { return super.hasActiveBlock(); } + + @Override + public void close() { + super.close(); + LOG.debug("AzureDFSBlockManager closed."); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java index c9aefc56eb2c6..ba842cbb79b62 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java @@ -147,7 +147,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, final String leaseId, TracingContext tracingContext) throws IOException { AbfsRestOperation op; - if (!blobBlockManager.hasListToCommit()) { + if (!blobBlockManager.hasBlocksToCommit()) { return null; } try { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index f8dab188f37eb..42e8f6ed3aa72 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -21,9 +21,6 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Random; import com.fasterxml.jackson.core.JsonProcessingException; @@ -221,8 +218,7 @@ void createRenamePendingJson(Path path, byte[] bytes) abfsClient.append(path.toUri().getPath(), bytes, appendRequestParameters, null, null, tracingContext); - List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId)); - String blockList = generateBlockListXml(blockIdList); + String blockList = generateBlockListXml(blockId); // PutBlockList on the path. abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8), path.toUri().getPath(), true, null, null, eTag, null, tracingContext); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java index ee88ebccf6d0f..2a06dfb4acc2f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java @@ -64,6 +64,7 @@ import org.apache.hadoop.util.Lists; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA; @@ -320,7 +321,7 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs, return ingressClient.flush(path, 3, false, false, null, null, encryptionAdapter, getTestTracingContext(fs, false)); } else { - byte[] buffer = generateBlockListXml(new ArrayList<>()).getBytes(StandardCharsets.UTF_8); + byte[] buffer = generateBlockListXml(EMPTY_STRING).getBytes(StandardCharsets.UTF_8); return ingressClient.flush(buffer, path, false, null, null, null, encryptionAdapter, getTestTracingContext(fs, false)); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 73a0d01105753..920c4964a559f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -95,6 +95,7 @@ public class ITestAzureBlobFileSystemAppend extends AbstractAbfsIntegrationTest { private static final String TEST_FILE_PATH = "testfile"; + private static final String TEST_FILE_PATH1 = "testfile1"; private static final String TEST_FOLDER_PATH = "testFolder"; @@ -243,6 +244,125 @@ public void testCreateOverDfsAppendOverBlob() throws IOException { .isInstanceOf(AbfsDfsClient.class); } + /** + * This test verifies that if multiple appends qualify for switch, no appends should fail. + */ + @Test + public void testMultipleAppendsQualifyForSwitch() throws Exception { + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + final AzureBlobFileSystem fs = getFileSystem(); + Path testPath = path(TEST_FILE_PATH); + AzureBlobFileSystemStore.Permissions permissions + = new AzureBlobFileSystemStore.Permissions(false, + FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + fs.getAbfsStore().getClientHandler().getDfsClient(). + createPath(makeQualified(testPath).toUri().getPath(), true, false, + permissions, false, null, + null, getTestTracingContext(fs, true)); + fs.getAbfsStore() + .getAbfsConfiguration() + .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List<Future<?>> futures = new ArrayList<>(); + + // Create three output streams with different content length + final byte[] b1 = new byte[8 * ONE_MB]; + new Random().nextBytes(b1); + + FSDataOutputStream out1 = fs.append(testPath); + FSDataOutputStream out2 = fs.append(testPath); + FSDataOutputStream out3 = fs.append(testPath); + + // Submit tasks to write to each output stream + futures.add(executorService.submit(() -> { + try { + out1.write(TEN); + out1.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + futures.add(executorService.submit(() -> { + try { + out2.write(TWENTY); + out2.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + futures.add(executorService.submit(() -> { + try { + out3.write(THIRTY); + out3.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + checkFuturesForExceptions(futures, 0); + AzureIngressHandler ingressHandlerFallback + = ((AbfsOutputStream) out1.getWrappedStream()).getIngressHandler(); + AbfsClient clientFallback = ingressHandlerFallback.getClient(); + Assertions.assertThat(clientFallback) + .as("DFS client was not used after fallback") + .isInstanceOf(AbfsDfsClient.class); + } + + /** + * This test verifies that parallel writes on dfs and blob endpoint should not fail. + */ + @Test + public void testParallelWritesOnDfsAndBlob() throws Exception { + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + final AzureBlobFileSystem fs = getFileSystem(); + Path testPath = path(TEST_FILE_PATH); + Path testPath1 = path(TEST_FILE_PATH1); + AzureBlobFileSystemStore.Permissions permissions + = new AzureBlobFileSystemStore.Permissions(false, + FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); + fs.getAbfsStore().getClientHandler().getDfsClient(). + createPath(makeQualified(testPath).toUri().getPath(), true, false, + permissions, false, null, + null, getTestTracingContext(fs, true)); + fs.getAbfsStore() + .getAbfsConfiguration() + .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + FSDataOutputStream out1 = fs.create(testPath); + fs.getAbfsStore().getClientHandler().getDfsClient(). + createPath(makeQualified(testPath1).toUri().getPath(), true, false, + permissions, false, null, + null, getTestTracingContext(fs, true)); + ExecutorService executorService = Executors.newFixedThreadPool(5); + List<Future<?>> futures = new ArrayList<>(); + + // Create three output streams with different content length + final byte[] b1 = new byte[8 * ONE_MB]; + new Random().nextBytes(b1); + FSDataOutputStream out2 = fs.append(testPath1); + + // Submit tasks to write to each output stream + futures.add(executorService.submit(() -> { + try { + out1.write(TEN); + out1.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + + futures.add(executorService.submit(() -> { + try { + out2.write(TWENTY); + out2.hsync(); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + checkFuturesForExceptions(futures, 0); + } + + /** * Creates a file over Blob and attempts to append over DFS. * It should fallback to Blob when appending to the file fails. @@ -252,6 +372,7 @@ public void testCreateOverDfsAppendOverBlob() throws IOException { @Test public void testCreateOverBlobAppendOverDfs() throws IOException { Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + assumeDfsServiceType(); Configuration conf = getRawConfiguration(); conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); conf.set(FS_AZURE_INGRESS_SERVICE_TYPE, @@ -292,6 +413,7 @@ public void testCreateOverBlobAppendOverDfs() throws IOException { @Test public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() throws IOException, NoSuchFieldException, IllegalAccessException { + assumeDfsServiceType(); Configuration conf = getRawConfiguration(); conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); conf.set(FS_AZURE_INGRESS_SERVICE_TYPE, @@ -971,9 +1093,8 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep new Random().nextBytes(bytes); // Write some bytes and attempt to flush, which should retry out.write(bytes); - List<String> list = new ArrayList<>(); - list.add(generateBlockId(out, 0)); - String blockListXml = generateBlockListXml(list); + String blockId = generateBlockId(out, 0); + String blockListXml = generateBlockListXml(blockId); Mockito.doAnswer(answer -> { // Set up the mock for the flush operation @@ -1069,9 +1190,8 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc new Random().nextBytes(bytes); // Write some bytes and attempt to flush, which should retry out.write(bytes); - List<String> list = new ArrayList<>(); - list.add(generateBlockId(out, 0)); - String blockListXml = generateBlockListXml(list); + String blockId = generateBlockId(out, 0); + String blockListXml = generateBlockListXml(blockId); Mockito.doAnswer(answer -> { // Set up the mock for the flush operation diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index a0796c1b36fd6..35ce615ba738c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -20,6 +20,7 @@ import java.io.FileNotFoundException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,6 +32,7 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; @@ -38,7 +40,14 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_SERVICE_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; @@ -53,7 +62,8 @@ public ITestAzureBlobFileSystemInitAndCreate() throws Exception { } @Override - public void setup() { + public void setup() throws Exception { + super.setup(); } @Override @@ -108,6 +118,33 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { .getAclStatus(Mockito.anyString(), any(TracingContext.class)); } + /** + * Test to verify that the initialization of the AzureBlobFileSystem fails + * when an invalid ingress service type is configured. + * + * This test sets up a configuration with an invalid ingress service type + * (DFS) for a Blob endpoint and expects an InvalidConfigurationValueException + * to be thrown during the initialization of the filesystem. + * + * @throws Exception if an error occurs during the test execution + */ + @Test + public void testFileSystemInitializationFailsForInvalidIngress() throws Exception { + assumeHnsDisabled(); + Configuration configuration = new Configuration(getRawConfiguration()); + String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY); + String accountKey = configuration.get( + accountProperty(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, getAccountName()), + configuration.get(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME)); + configuration.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, + accountKey.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); + configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name()); + String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME); + intercept(InvalidConfigurationValueException.class, + INCORRECT_INGRESS_TYPE, () -> + FileSystem.newInstance(new Path(blobUri).toUri(), configuration)); + } + @Test public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exception { AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(