-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: HadoopFileIO to support bulk delete through the Hadoop Filesystem APIs #10233
base: main
Are you sure you want to change the base?
Core: HadoopFileIO to support bulk delete through the Hadoop Filesystem APIs #10233
Conversation
SetMultimap<String, Path> fsMap = | ||
Multimaps.newSetMultimap(Maps.newHashMap(), Sets::newHashSet); | ||
List<Future<List<Map.Entry<Path, String>>>> deletionTasks = Lists.newArrayList(); | ||
for (String path : pathnames) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we would want to maintain the streaming nature of the delete (e.g. collect and delete batches incrementally as opposed to collecting them all first), which is the intent of the input being iterable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that should be the case; this is based on the S3FileIO implementation except that it works with every FileSystem instance, rather than just s3 (we will offer the api with a page size of 1 everywhere).
as soon as the number of entries for a filesystem matches the page size, a bulkdelete call is submitted.
for page size == 1 (classic fs, gcs) its a convoluted way to get the same delete(path, false) call.
for s3
- page size > 1: code can build up a large page before submitting; page size set by existing option
s3a with bulk delete disabled
. no safety checks or guarantees other than "objects at path without trailing / will be deleted, no attempts to recreate parent dir - page size == 1. single object at path without / will be deleted, no attempts to recreate parent dir.
which means even when bulk delete is disabled (some third party stores, including GCS), it's still more efficient as it saves two LISTs and more.
for the branch-3.3 implementation we are likely to just offer that single-but-less-chatty delete...the move to the v2 SDK means backporting complex stuff is now impossible. But we want to provide the reflection-friendly API so that it can be used easily.
for (String path : pathnames) { | ||
Path p = new Path(path); | ||
final URI uri = p.toUri(); | ||
String fsURI = uri.getScheme() + "://" + uri.getHost() + "/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the URI
class has some significant incompatibilities for various cloud providers like GCP who allow non-standard characters (e.g. underscore in bucket name) which will result in the host being empty/null and causing problems. We use classes like GCSLocation
or S3URI
to workaround these issues. We should try to avoid use of URI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we should just use the root path of every fs as the key? It uses the uri as hash code and comparator internally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using the root path of every filesystem instance; relies on the Path type's internal URI stuff. There are some s3 bucket names that aren't valid hostnames, for those the s3a policy is "not supported". dots in bucket names are a key example. Even amazon advise against those
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danielcweeks quick resolution for this. FileSystem.get() uses that root path URI to look up filesystems from its cache.
Non-standard hostnames which can't be converted to a URI are probably not work through HadoopFileIO today.
This PR is in sync with apache/hadoop#6686 ; the dynamic binding classes in here are from that PR (which also copies in the parquet DynMethods classes for consistency everywhere). |
64423e2
to
e794b1b
Compare
the latest update runs the tests against local fs paramterized on using/not using bulk delete -the library settings have been modified to use hadoop 3.4.1-SNAPSHOT for this. it works, and execution time on large file deletion (added to the listPrefix test) is comparable, but there is now two codepaths to test and maintain. I plan to coalesce them so test coverage is better, even on older builds |
|
e794b1b
to
026ca47
Compare
5839ca1
to
968809c
Compare
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
968809c
to
fb10c1a
Compare
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
dcb38e6
to
b352b04
Compare
b352b04
to
661ddc6
Compare
For anyone watching this, there's a full integration test suite in the hadoop test code: apache/hadoop#7285 All is good, though as it's the first java17 code and depends on an iceberg jar with this patch in, it's actually dependent on this PR going in first. It does show that yes, bulk deletes through the S3A code does work, without me having to add a significant piece of test code to iceberg to wire up hadoop fileIO to the minio docker container that S3FileIO uses. Anyway, with those tests happy, I just have a few more tests to write (mixing some local files too) and then I'll be confident this is ready for review |
Reflection-based used of Hadoop 3.4.1+ BulkDelete API so that S3 object deletions can be done in pages of objects, rather than one at a time. Configuration option "iceberg.hadoop.bulk.delete.enabled" to switch to bulk deletes. There's a unit test which will turn this on if the wrapped APIs are loaded and probe the HadoopFileIO instance for it using the APIs. * Parameterized tests for bulk delete on/off * Cache bulk delete page size; this brings performance of bulk delete with page size == 1 to that of single delete * Use deleteFiles() in tests which create many files; helps highlight performance/scale issues against local fs. + updates aws.md to cover this and other relevant s3a settings, primarily for maximum parquet IO
This comment was marked as outdated.
This comment was marked as outdated.
b0353db
to
c5874e5
Compare
The failures here are due to mocking; if a mocked Hadoop FS lacks the new bulk delete API, hadoop common's attempt to invoke it will fail to link even though there's a default implementation in the base FileSystem class. Any UnsupportedOperationException raised is now caught and switches to the non-bulk-IO operations. It does not disable bulk IO, though it is always an option here, as this is not going to get better. However, it should be impossible to encounter in production. |
c5874e5
to
ee63b3c
Compare
if the bulk delete invocation didn't link (e.g. from mocking), then bulk delete is disabled for the life of this HadoopFileIO instance
Code for #12055
Using the bulk delete for files eliminates the per-file probe for status of the destination object, an issuing of a single delete request and then a probe to see if we need to recreate an empty directory marker above it, moving to deleting a few hundred objects in a single request at a time.
Tested: S3 london
In apache/hadoop#7316 there's an (uncommitted) PR for hadoop which takes an iceberg library and verifies the correct operation of the iceberg delete operations
against a live AWS S3 store with- and without- bulk delete enabled.
https://github.com/apache/hadoop/blob/d37310cf355f3eb137f925bde9a2a299823b8230/hadoop-tools/hadoop-aws/src/test/java17/org/apache/hadoop/fs/contract/s3a/ITestIcebergBulkDelete.java
This is something we can merge into hadoop as a local regression test once Iceberg has a release with this.