-
Notifications
You must be signed in to change notification settings - Fork 9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-18679. Add API for bulk/paged delete of files #6726
HADOOP-18679. Add API for bulk/paged delete of files #6726
Conversation
A more minimal design that is easier to use and implement. Caller creates a BulkOperation; they get the page size of it and then submit batches to delete of less than that size. The outcome of each call contains a list of failures. S3A implementation to show how straightforward it is. Even with the single entry page size, it is still more efficient to use this as it doesn't try to recreate a parent dir or perform any probes to see if it is a directory: it maps straight to a DELETE call. Change-Id: Ibe8737e7933fe03d39070e7138a710b50c3d60c2
Add methods in FileUtil to take an FS, cast to a BulkDeleteSource then perform the pageSize/bulkDelete operations. This is to make reflection based access straightforward: no new interfaces or types to work with, just two new methods with type-erased lists. Change-Id: I2d7b1bf8198422de635253fc087ca551a8bc6609
Change-Id: Ib098c07cc1f7747ed1a3131b252656c96c520a75
Using this PR to start with the initial design, implementation and services offered by having lower-level interaction with S3 pushed down into an S3AStore class, with interface/impl split. The bulk delete callbacks now to talk to the store, *not* s3afs, with some minor changes in behaviour (IllegalArgumentException is raised if root paths / are to be deleted) Mock tests are failing; I expected that: they are always brittle. What next? get this in and then move lower level fs ops over a method calling s3client at a time, or in groups, as appropriate. The metric of success are: * all callback classes created in S3A FS can work through the store * no s3client direct invocation in S3AFS Change-Id: Ib5bc58991533fd5d13e78426e88b884b6ae5205c
Changing results of method calls, using Tuples.pair() to return Map.Entry() instances as immutable tuples. Change-Id: Ibdd5a5b11fe0a57b293b9cb623272e272c8bab69
and some minor prod changes.
4bbb3b7
to
a61f139
Compare
...-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsContractBulkDelete.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
...adoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Outdated
Show resolved
Hide resolved
commented. I've also done a PR #6738 which tunes the API to work with iceberg, having just written a PoC of the iceberg binding. My PR
Can you cherrypick this PR onto your branch and then do the review comments. After which, please do not do any rebasing of your PR. That way, it is easier for me too keep my own branch in sync with your changes. Thanks. PoC of iceberg integration, based on their S3FileIO one. The iceberg api passes in a collection of paths, which may span multiple filesystems. To handle this,
|
We are going to need a default FS impl which just invokes delete(path, false) and maps any IOE to a failure. Change-Id: If56bca7cb8529ccbfbb1dfa29cedc8287ec980d4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented
*/ | ||
public class DefaultBulkDeleteOperation implements BulkDelete { | ||
|
||
private final int pageSize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is always 1, isn't it? so much can be simplified here
- no need for the field
- no need to pass it in the constructor
- pageSize() to return 1
validateBulkDeletePaths(paths, pageSize, basePath); | ||
List<Map.Entry<Path, String>> result = new ArrayList<>(); | ||
// this for loop doesn't make sense as pageSize must be 1. | ||
for (Path path : paths) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's only even going to be 1 entry here
try { | ||
fs.delete(path, false); | ||
// What to do if this return false? | ||
// I think we should add the path to the result list with value "Not Deleted". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good q. I'd say yes. or actually do a getFileStatus() cal and see what is there for
- file doesn't exist (not an error)
- path is a directory: add to result
key is that file not found isn't escalated
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
...s/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractBulkDelete.java
Outdated
Show resolved
Hide resolved
bindReadOnlyRolePolicy(assumedRoleConfig, readOnlyDir); | ||
roleFS = (S3AFileSystem) destDir.getFileSystem(assumedRoleConfig); | ||
|
||
int range = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
on a store where bulk delete page size == 1, use that as the range
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
Show resolved
Hide resolved
...adoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
Add bulk delete path capability true for all FS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commented.
- I'd prefer that BulkDeleteSource to be in FileSystem, with the base implementatoon returning a DefaultBulkDeleteOperation instance.
- and so we should have one of the contract tests doing it directly through the API on the getFileSystem() instance
- other than that though, looking really good.
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/bulkdelete.md
Outdated
Show resolved
Hide resolved
...adoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractBulkDeleteTest.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Outdated
Show resolved
Hide resolved
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
Outdated
Show resolved
Hide resolved
...common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefalutBulkDeleteSource.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteUtils.java
Show resolved
Hide resolved
...mon-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DefaultBulkDeleteOperation.java
Outdated
Show resolved
Hide resolved
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BulkDeleteSource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented. Now, one more thing based on my changes in #6686.
it's adding many more methods to this class, so I'm giving them the name of the class/interface + "_" + method name.
can you do the same here? some style checker will complain but it will help us to separate the methods in the new class.
other than that, all good
@@ -4980,4 +4982,17 @@ public MultipartUploaderBuilder createMultipartUploader(Path basePath) | |||
methodNotSupported(); | |||
return null; | |||
} | |||
|
|||
/** | |||
* Create a default bulk delete operation to be used for any FileSystem. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't hold for the subclasses. better to say
Create a bulk delete operation.
The default implementation returns an instance of {@link DefaultBulkDeleteOperation}
I don't understand what to do here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mukund-thakur tried to clarify what i mean: we have the class/interface split from the method/operation by a _ character
it makes a lot more sense when you look at my PR, which brings a lot more methods into the same class. Why all the same class? less reflection code
* @throws IllegalArgumentException path not valid. | ||
* @throws IOException problems resolving paths | ||
*/ | ||
public static int bulkDeletePageSize(FileSystem fs, Path path) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename bulkDelete_pageSize
* @throws IOException IO problems including networking, authentication and more. | ||
* @throws IllegalArgumentException if a path argument is invalid. | ||
*/ | ||
public static List<Map.Entry<Path, String>> bulkDelete(FileSystem fs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename bulkDelete_delete
💔 -1 overall
This message was automatically generated. |
mukund, if you can do those naming changes then I'm +1 |
+1 I was about to merge then I realised that yetus wasn't ready. Here is my draft commit message
create a BulkDelete implementation from a This is optimized for object stores with bulk delete APIs; Even with a page size of 1, the S3A implementation is The interface BulkDeleteSource is implemented by To aid use through reflection APIs, the class Contributed by Mukund Thakur and Steve Loughran |
merged to trunk. mukund, can you do the backport to branch-3.4.1, while we can think about what to do for 3.3.9? speaking of which, we should think about that... |
fyi #6686 adds dynamic load for the wrapped methods, and calls them through the tests. |
While backporting this to branch-3.4 I see this failure. Will check if this is happening on trunk as well. |
yeah, just seen those too. the mocked s3 client isn't getting the delete one object calls via the store object. got a small PR up to help debug it, which is not a fix |
problem is store is null in innermost s3afs, triggers an NPE in deleteObject() before the aws client has its delete operation called, so list of deleted paths is not updated. easiest immediate fix is to mock deleteObject(), longer term we should actually be stubbing the entire store, as that's what the interface/impl split is meant to assist |
fix this with mockito magic somehow. https://github.com/apache/hadoop/pull/6843/files |
I tried to do it yesterday too. I think my solution would have pulled out createStore() into a method and overrode it, or added S3AInternals.setStore() call. |
update, #5081 does successfully show the staging UT failure (good!) but also some in hadoop-common due to the new interface...we need to modify the tests to indicate it is safe to not override the base implementation. |
Applications can create a BulkDelete instance from a BulkDeleteSource; the BulkDelete interface provides the pageSize(): the maximum number of entries which can be deleted, and a bulkDelete(Collection paths) method which can take a collection up to pageSize() long. This is optimized for object stores with bulk delete APIs; the S3A connector will offer the page size of fs.s3a.bulk.delete.page.size unless bulk delete has been disabled. Even with a page size of 1, the S3A implementation is more efficient than delete(path) as there are no safety checks for the path being a directory or probes for the need to recreate directories. The interface BulkDeleteSource is implemented by all FileSystem implementations, with a page size of 1 and mapped to delete(pathToDelete, false). This means that callers do not need to have special case handling for object stores versus classic filesystems. To aid use through reflection APIs, the class org.apache.hadoop.io.wrappedio.WrappedIO has been created with "reflection friendly" methods. Contributed by Mukund Thakur and Steve Loughran Conflicts: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
* <li>The operation is treated as idempotent: network failures may | ||
* trigger resubmission of the request -any new objects created under a | ||
* path in the list may then be deleted.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is great that we call this out, but can we do better at the API level?
Had the API took Collection<FileStatus>
, we could make the operation idempotent in most clouds by passing the version/generationid/etag in the request.
Aws sdk delete with version id actually requires more IAM permissions than unversioned delete, which always removes HEAD object, because granting that permission allows the caller to delete backups. Deployments where apps can delete HEAD but not versions are not unusual for this reason. This is why S3A doesn't use it even in simple listing -> delete calls where the status is known. you might also need to issue getFileStatus/list calls, which would massively increase the cost if the process didn't have those values already. A bulk delete with a tuple of (path, version) for each entry could work, if the store could be configured to use that version ID/type. for S3A we would leave it off by default. the tuple would be Map.entry to be reflection friendly. if you do thing version/etag support would be a blocker to use, well, things haven't shipped yet, though @mukund-thakur is preparing a 3.4.1 alpha release. You (and it would be you, sorry) will need to modify the api with
This isn't that useful for table compaction, as the engines tend to use randomness in their names to spread the s3 store load across shards. But it could have other uses. for example. here's some work to do version printing, recovery and copy within the same bucket, lets you pull out the layers underneath a directory tree https://github.com/steveloughran/cloudstore/tree/main/src/main/extra/org/apache/hadoop/fs/s3a/extra question is: do we want to make something that complex part of a broader api with tests, specification, commitments to maintain etc, or do we just say call |
#3633 shows what's required there. |
BTW, the "is delete idempotent?" question is a recurrent one; search the mail lists for history on it, including even HDFS. In a world with parallel writers, it clearly isn't. But if you are going from paged list -> delete you are already getting non-atomic listings where race conditions can do odd things (rename file zzzzzzz to aaaaa and not have the listing find it at all). You can rebuild S3AFS with |
Thank you for the context @steveloughran. Lots of good info above. Should we be adding an overload that takes To close, I want to mention that idempotency of operations is not only useful in the face of multiple (application) writers. It also plays a role when only one writer is involved. The reason is the distributed nature of the cloud storage and loadbalances/backends inside cloud storage implementation. For example:
Had the request in (2) and (4) been idempotent, writer wouldn't observe |
That scenario is a key part of the whole delete idempotency discussion. And as noted, trying to implement idempotency there on s3 requires permissions which apps are often not trusted with. As for abfs, I'd worry more about rename resilience under load than deletion. If you've not hit problems there: you've not generated enough load. Look at HADOOP-18613 and HADOOP-18012 there. Really you can't rely on delete or rename doing what you expect, so your commit protocol had better not rely on them. Though #6716 shows that even when I think I've done that, I can get caught out. As for the "overload which takes FileStatus", look at openFile() work and followup issues in the s3a and abfs codebase which caught us out. In particular, any virtual FS on top (hive, viewFS, maybe the databricks one...) builds their own FileStatus instances because the path in them is absolute and includes the full URL. Which means the file status passed in is often of a different type from any FS specific implementation, lacks all version/etag info and whose path doesn't resolve. it'd have to take path and filestatus separately. Returning to this API, look at apache/iceberg#10233 for the latest integration; it is only given strings to map to paths, which is what the new API takes. For a commit process with idempotency, are you looking for rename with versioning? or even a createFile() with if-matches condition on underlying files...something Azure supports. adding if-matches into createFile() is possible if you want to design that, the spec would need to say "failure MAY happen at close() to support any store where the put/post only happens at end. And you need a way to pass that version of the file being overwritten (etag, version) down. Assuming this was etag only, we have that API though wrapper filesystems don't let pick it up (not in the base class; adding new field would break serialization across versions...). If you want this *and are prepared to do the work, at least for stores which implement it (azure) then I'm happy to supervise your effort, especially if we can get the MSFT engineers involved. Create that JIRA and once you put up your first design I'll assign the JIRA to you. It'd have target 3.4.2., but as getEtag() and openFile() have shipped for a while string etag;
status = fs.getFileStatus(dest); (catch FNFE, set etag to "")
etag = status.getEtag()
FSDataOutputStream out = openFile(dest).
must("fs.create.option.if-match", etag)
.build()
...
out.close();
like I said, gets complex fast -but if you want something stable to work with current and future stores, it'd have to be the way. None of this needs new public FS APIs, just standard behaviour in stores which implement it. go for it! |
HADOOP-18679
Adding tests on top of #6494
Description of PR
How was this patch tested?
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?