-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19233: ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint #7265
base: trunk
Are you sure you want to change the base?
Conversation
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments
AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), | ||
ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null), | ||
MD5_MISMATCH("Md5Mismatch", HttpURLConnection.HTTP_BAD_REQUEST, | ||
"The MD5 value specified in the request did not match with the MD5 value calculated by the server."), | ||
COPY_BLOB_FAILED("COPY_BLOB_FAILED", HttpURLConnection.HTTP_INTERNAL_ERROR, null), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error Codes should be in camelcase as others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
|
||
package org.apache.hadoop.fs.azurebfs.enums; | ||
|
||
public enum BlobCopyProgress { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc for class and enums
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java Doc added.
@@ -49,6 +49,9 @@ public interface SASTokenProvider { | |||
String SET_PERMISSION_OPERATION = "set-permission"; | |||
String SET_PROPERTIES_OPERATION = "set-properties"; | |||
String WRITE_OPERATION = "write"; | |||
String COPY_BLOB_DESTINATION = "copy-blob-dst"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to discuss this change once, we do not support UDS for FNS Blob
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was not in use anywhere, so removed it for now.
destination, sourceEtag, isAtomicRenameKey(source), tracingContext | ||
); | ||
incrementAbfsRenamePath(); | ||
return blobRenameHandler.execute(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might need rechecking. We do not want to return op as null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per the offline discussion, made the changes.
final TracingContext tracingContext, | ||
final boolean isNamespaceEnabled) throws AzureBlobFileSystemException { | ||
getBlobDeleteHandler(path, recursive, tracingContext).execute(); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we returning null here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done the changes to return dummy response same as rename Path.
@@ -201,4 +238,25 @@ public int getAcquireRetryCount() { | |||
public TracingContext getTracingContext() { | |||
return tracingContext; | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java docs for all public methods and classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java doc added.
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hadoop.fs.azurebfs.services; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadocs missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Java doc added.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
This comment was marked as outdated.
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
* Blob copy API is an async API, this configuration defines polling duration | ||
* for checking copy status {@value} | ||
*/ | ||
public static final String FS_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = "fs.azure.blob.copy.progress.wait.millis"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the comments formatting constant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
@@ -104,5 +104,9 @@ public final class HttpHeaderConfigurations { | |||
*/ | |||
public static final String X_MS_BLOB_CONTENT_MD5 = "x-ms-blob-content-md5"; | |||
|
|||
public static final String X_MS_COPY_ID = "x-ms-copy-id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs for new constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
@@ -359,6 +375,34 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur | |||
return op; | |||
} | |||
|
|||
private void fixAtomicEntriesInListResults(final AbfsRestOperation op, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add javadocs for all methods wherever missing
@@ -338,6 +353,7 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur | |||
requestHeaders); | |||
|
|||
op.execute(tracingContext); | |||
fixAtomicEntriesInListResults(op, tracingContext); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explain the need for the same
List<BlobListResultEntrySchema> filteredEntries = new ArrayList<>(); | ||
for (BlobListResultEntrySchema entry : listResultSchema.paths()) { | ||
if (!takeListPathAtomicRenameKeyAction(entry.path(), | ||
(int) (long) entry.contentLength(), tracingContext)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double casting is not needed, we can use something like :-
Long longValue = 12345L;
int intValue = longValue.intValue();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
@@ -45,6 +45,8 @@ public enum AzureServiceErrorCode { | |||
INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE("InvalidSourceOrDestinationResourceType", HttpURLConnection.HTTP_CONFLICT, null), | |||
RENAME_DESTINATION_PARENT_PATH_NOT_FOUND("RenameDestinationParentPathNotFound", HttpURLConnection.HTTP_NOT_FOUND, null), | |||
INVALID_RENAME_SOURCE_PATH("InvalidRenameSourcePath", HttpURLConnection.HTTP_CONFLICT, null), | |||
DIRECTORY_NOT_EMPTY_DELETE("DirectoryNotEmpty", HttpURLConnection.HTTP_CONFLICT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming could be changed
} else { | ||
throw new AbfsRestOperationException(HTTP_INTERNAL_ERROR, | ||
AzureServiceErrorCode.UNKNOWN.getErrorCode(), | ||
"FNS-Blob Rename was not successfull", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: successful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should add path or file name as well where rename failed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
throw ex; | ||
} | ||
} | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we directly returning true here and not checking for renameSrchasChanged as in the previous method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the above if condition we are filtering out paths for which we don't have to do the rename redo and returning false there itself. In case where path require rename redo, we are returning true if no exception is raised.
|
||
/** | ||
* Orchestrator for delete over Blob endpoint. Blob endpoint for flat-namespace | ||
* account does not support director delete. This class is responsible for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: directory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved!
try { | ||
/* | ||
* Delete the required path. | ||
* Directory needs to be safely delete the path, as the path can be implicit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: grammar issue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated!
this.isAtomicRenameRecovery = isAtomicRenameRecovery; | ||
} | ||
|
||
public BlobRenameHandler(final String src, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
javadocs
* @return true if the path contains a colon | ||
*/ | ||
private boolean containsColon(Path p) { | ||
return p.toUri().getPath().contains(":"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
COLON constant can be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
@Override | ||
boolean takeAction(final Path path) throws AzureBlobFileSystemException { | ||
return renameInternal(path, | ||
createDestinationPathForBlobPartOfRenameSrcDir(dst, path, src)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method name can be shortened
try { | ||
AbfsRestOperation copyPathOp = getAbfsClient().copyBlob(src, dst, leaseId, | ||
tracingContext); | ||
final String progress = copyPathOp.getResult() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for copyPathOp != null && copyPathOp.getResult() != null
tracingContext, null, false); | ||
final String srcCopyPath = ROOT_PATH + getAbfsClient().getFileSystem() | ||
+ src.toUri().getPath(); | ||
if (dstPathStatus.getResult() != null && (srcCopyPath.equals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should have check for dstPathStatus != null as well
} | ||
final long pollWait = getAbfsClient().getAbfsConfiguration() | ||
.getBlobCopyProgressPollWaitMillis(); | ||
while (handleCopyInProgress(dst, tracingContext, copyId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check copyId != null
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If handleCopyInProgress keeps returning PENDING, the code might enter an infinite loop of waiting. We should introduce maximum wait time and if exceeded fail.
|
||
if (op.getResult() != null && copyId.equals( | ||
op.getResult().getResponseHeader(X_MS_COPY_ID))) { | ||
final String copyStatus = op.getResult() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for response header is not null is needed
getAbfsClient().checkIsDir(op.getResult()), | ||
extractEtagHeader(op.getResult()), | ||
op.getResult() instanceof AbfsHttpOperation.AbfsHttpOperationWithFixedResultForGetFileStatus); | ||
} catch (AzureBlobFileSystemException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to catch AbfsRestOperationException itself ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken!
import java.nio.charset.Charset; | ||
import java.nio.charset.StandardCharsets; | ||
|
||
//import java.util.ArrayList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will go once Ingress handler changes are taken.
renamePendingJsonFormatObj = objectMapper.readValue(contents, | ||
RenamePendingJsonFormat.class); | ||
} catch (JsonProcessingException e) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it right to just return without throwing an exception ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is expected. In case of Json Processing Error, we are deleting the json file and returning from it.
abfsClient.append(path.toUri().getPath(), bytes, | ||
appendRequestParameters, null, null, tracingContext); | ||
|
||
// List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove commented code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will go once Ingress handler changes are taken.
// List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId)); | ||
// String blockList = generateBlockListXml(blockIdList); | ||
// PutBlockList on the path. | ||
String blockList = ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if flush is called on empty string, how does it take the blockId into usage ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this line just to pass the build. The above commented line is calling generateBlockListXml which requires ingress handler changes. Will pick the changes of ingress handler once it is merged and this line is no longer needed after that.
…manish/rename_delete
* endpoint, the orchestration would be done by the client. The idempotency | ||
* issue would not happen for blob endpoint. | ||
*/ | ||
assertTrue(fs.getAbfsClient() instanceof AbfsDfsClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can use getAbfsServiceType()
@@ -314,4 +347,278 @@ public void deleteBlobDirParallelThreadToDeleteOnDifferentTracingContext() | |||
fs.delete(new Path("/testDir"), true); | |||
fs.close(); | |||
} | |||
|
|||
private void assumeBlobClient() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] This method can be avoided and all calls to it can be replaced by getAbfsServiceType() == AbfsServiceType.BLOB
fs.create(new Path("testDir2/test4/file1")); | ||
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3/file"))); | ||
assertTrue(fs.exists(new Path("testDir2/test1/test2/test3/file1"))); | ||
Assert.assertTrue(fs.rename(new Path("testDir2/test1/test2/test3"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use assertions.assertThat
* API of {@link AzureBlobFileSystem} should recover the paused rename. | ||
*/ | ||
@Test | ||
public void testHBaseHandlingForFailedRenameWithListRecovery() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This and the test below are almost similar, code can be reused
🎊 +1 overall
This message was automatically generated. |
testAtomicityRedoInvalidFile(fs); | ||
} | ||
|
||
private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comments for tests
Description of PR:
This PR is in correlation to the series of work done under Parent Jira: [HADOOP-19179]
Jira for this Patch: [HADOOP-19233]
Currently, we only support rename and delete operations on the DFS endpoint. The reason for not supporting rename and delete operations on the Blob endpoint is that the Blob endpoint does not account for hierarchy. We need to ensure that the HDFS contracts are maintained when performing rename and delete operations. Renaming or deleting a directory over the Blob endpoint requires the client to handle the orchestration and rename or delete all the blobs within the specified directory.
The task outlines the considerations for implementing rename and delete operations for the FNS-blob endpoint to ensure compatibility with HDFS contracts.