Skip to content

Commit 9c41299

Browse files
committed
HADOOP-18679. Use dynamic loading of bulk delete methods
This is in sync with apache/hadoop#6686 which has renamed one of the method names to load. The new DynamicWrappedIO class is based on one being written as part of that PR, as both are based on the Parquet DynMethods class a copy-and-paste is straightforward.
1 parent ff59d8f commit 9c41299

File tree

3 files changed

+268
-4
lines changed

3 files changed

+268
-4
lines changed

core/src/main/java/org/apache/iceberg/hadoop/HadoopFileIO.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@
3636
import org.apache.hadoop.fs.FileSystem;
3737
import org.apache.hadoop.fs.Path;
3838
import org.apache.hadoop.fs.RemoteIterator;
39-
import org.apache.hadoop.io.wrappedio.WrappedIO;
4039
import org.apache.iceberg.exceptions.RuntimeIOException;
40+
import org.apache.iceberg.hadoop.wrappedio.DynamicWrappedIO;
4141
import org.apache.iceberg.io.BulkDeletionFailureException;
4242
import org.apache.iceberg.io.DelegateFileIO;
4343
import org.apache.iceberg.io.FileInfo;
@@ -68,6 +68,8 @@ public class HadoopFileIO implements HadoopConfigurable, DelegateFileIO {
6868

6969
private SerializableSupplier<Configuration> hadoopConf;
7070
private SerializableMap<String, String> properties = SerializableMap.copyOf(ImmutableMap.of());
71+
private transient DynamicWrappedIO wrappedIO;
72+
private boolean useBulkDelete;
7173

7274
/**
7375
* Constructor used for dynamic FileIO loading.
@@ -83,6 +85,7 @@ public HadoopFileIO(Configuration hadoopConf) {
8385

8486
public HadoopFileIO(SerializableSupplier<Configuration> hadoopConf) {
8587
this.hadoopConf = hadoopConf;
88+
this.wrappedIO = new DynamicWrappedIO(this.getClass().getClassLoader());
8689
}
8790

8891
public Configuration conf() {
@@ -92,6 +95,7 @@ public Configuration conf() {
9295
@Override
9396
public void initialize(Map<String, String> props) {
9497
this.properties = SerializableMap.copyOf(props);
98+
this.useBulkDelete = wrappedIO.bulkDeleteAvailable();
9599
}
96100

97101
@Override
@@ -178,7 +182,7 @@ public void deletePrefix(String prefix) {
178182
@Override
179183
public void deleteFiles(Iterable<String> pathsToDelete) throws BulkDeletionFailureException {
180184
AtomicInteger failureCount = new AtomicInteger(0);
181-
if (WrappedIO.isBulkDeleteAvailable()) {
185+
if (useBulkDelete) {
182186
failureCount.set(bulkDeleteFiles(pathsToDelete));
183187
} else {
184188
Tasks.foreach(pathsToDelete)
@@ -223,7 +227,7 @@ private int bulkDeleteFiles(Iterable<String> pathnames) {
223227
Set<Path> pathsForFilesystem = fsMap.get(fsRoot);
224228
pathsForFilesystem.add(target);
225229

226-
int pageSize = WrappedIO.bulkDeletePageSize(fs, target);
230+
int pageSize = wrappedIO.bulkDelete_pageSize(fs, target);
227231

228232
// the page size has been reached.
229233
// for classic filesystems page size == 1 so this happens every time.
@@ -281,7 +285,7 @@ private int bulkDeleteFiles(Iterable<String> pathnames) {
281285
*/
282286
private List<Map.Entry<Path, String>> deleteBatch(FileSystem fs, final Path fsRoot, Collection<Path> paths) {
283287

284-
return WrappedIO.bulkDelete(fs, new Path(fs.getUri()), paths);
288+
return wrappedIO.bulkDelete_delete(fs, new Path(fs.getUri()), paths);
285289
}
286290

287291
private int deleteThreads() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.iceberg.hadoop.wrappedio;
20+
21+
import java.util.Collection;
22+
import java.util.List;
23+
import java.util.Map;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.iceberg.common.DynClasses;
27+
import org.apache.iceberg.common.DynMethods;
28+
import org.apache.iceberg.hadoop.HadoopFileIO;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
/**
33+
* The wrapped IO methods in {@code WrappedIO}, dynamically loaded.
34+
*/
35+
public final class DynamicWrappedIO {
36+
37+
private static final Logger LOG = LoggerFactory.getLogger(HadoopFileIO.class);
38+
39+
/**
40+
* Classname of the wrapped IO class: {@value}.
41+
*/
42+
public static final String WRAPPED_IO_CLASSNAME =
43+
"org.apache.hadoop.io.wrappedio.WrappedIO";
44+
45+
/**
46+
* Method name for bulk delete: {@value}
47+
*/
48+
public static final String BULKDELETE_DELETE = "bulkDelete_delete";
49+
50+
/**
51+
* Method name for bulk delete: {@value}
52+
*/
53+
public static final String BULKDELETE_PAGESIZE = "bulkDelete_pageSize";
54+
55+
/**
56+
* Wrapped IO class.
57+
*/
58+
private final Class<?> wrappedIO;
59+
60+
/**
61+
* Was wrapped IO loaded?
62+
* In the hadoop codebase, this is true.
63+
* But in other libraries it may not always be true...this
64+
* field is used to assist copy-and-paste adoption.
65+
*/
66+
private final boolean loaded;
67+
68+
/**
69+
* Method binding.
70+
* {@code WrappedIO.bulkDelete_delete(FileSystem, Path, Collection)}.
71+
*/
72+
private final DynMethods.UnboundMethod bulkDeleteDeleteMethod;
73+
74+
/**
75+
* Method binding.
76+
* {@code WrappedIO.bulkDelete_pageSize(FileSystem, Path)}.
77+
*/
78+
private final DynMethods.UnboundMethod bulkDeletePageSizeMethod;
79+
80+
/**
81+
* Dynamically load the WrappedIO class and its methods.
82+
* @param loader classloader to use.
83+
*/
84+
public DynamicWrappedIO(ClassLoader loader) {
85+
// load the class
86+
final DynClasses.Builder builder = DynClasses.builder();
87+
wrappedIO = builder
88+
.loader(loader)
89+
.impl(WRAPPED_IO_CLASSNAME)
90+
.orNull()
91+
.build();
92+
93+
loaded = wrappedIO != null;
94+
if (loaded) {
95+
96+
// bulk delete APIs
97+
bulkDeleteDeleteMethod = loadInvocation(
98+
wrappedIO,
99+
List.class,
100+
BULKDELETE_DELETE,
101+
FileSystem.class,
102+
Path.class,
103+
Collection.class);
104+
105+
bulkDeletePageSizeMethod = loadInvocation(
106+
wrappedIO,
107+
Integer.class,
108+
BULKDELETE_PAGESIZE,
109+
FileSystem.class,
110+
Path.class);
111+
112+
} else {
113+
// set to no-ops.
114+
// the loadInvocation call would do this anyway;
115+
// this just makes the outcome explicit.
116+
bulkDeleteDeleteMethod = noop(BULKDELETE_DELETE);
117+
bulkDeletePageSizeMethod = noop(BULKDELETE_PAGESIZE);
118+
}
119+
120+
}
121+
122+
/**
123+
* Is the wrapped IO class loaded?
124+
*
125+
* @return true if the wrappedIO class was found and loaded.
126+
*/
127+
public boolean loaded() {
128+
return loaded;
129+
}
130+
131+
/**
132+
* Are the bulk delete methods available?
133+
*
134+
* @return true if the methods were found.
135+
*/
136+
public boolean bulkDeleteAvailable() {
137+
return !bulkDeleteDeleteMethod.isNoop();
138+
}
139+
140+
/**
141+
* Get the maximum number of objects/files to delete in a single request.
142+
*
143+
* @param fileSystem filesystem
144+
* @param path path to delete under.
145+
*
146+
* @return a number greater than or equal to zero.
147+
*
148+
* @throws UnsupportedOperationException bulk delete under that path is not supported.
149+
* @throws IllegalArgumentException path not valid.
150+
* @throws RuntimeException invocation failure.
151+
*/
152+
public int bulkDelete_pageSize(final FileSystem fileSystem, final Path path) {
153+
return bulkDeletePageSizeMethod.invoke(null, fileSystem, path);
154+
}
155+
156+
/**
157+
* Delete a list of files/objects.
158+
* <ul>
159+
* <li>Files must be under the path provided in {@code base}.</li>
160+
* <li>The size of the list must be equal to or less than the page size.</li>
161+
* <li>Directories are not supported; the outcome of attempting to delete
162+
* directories is undefined (ignored; undetected, listed as failures...).</li>
163+
* <li>The operation is not atomic.</li>
164+
* <li>The operation is treated as idempotent: network failures may
165+
* trigger resubmission of the request -any new objects created under a
166+
* path in the list may then be deleted.</li>
167+
* <li>There is no guarantee that any parent directories exist after this call.
168+
* </li>
169+
* </ul>
170+
*
171+
* @param fs filesystem
172+
* @param base path to delete under.
173+
* @param paths list of paths which must be absolute and under the base path.
174+
*
175+
* @return a list of all the paths which couldn't be deleted for a reason other than
176+
* "not found" and any associated error message.
177+
*
178+
* @throws UnsupportedOperationException bulk delete under that path is not supported.
179+
* @throws IllegalArgumentException if a path argument is invalid.
180+
* @throws RuntimeException for any failure.
181+
*/
182+
public List<Map.Entry<Path, String>> bulkDelete_delete(FileSystem fs,
183+
Path base,
184+
Collection<Path> paths) {
185+
186+
return bulkDeleteDeleteMethod.invoke(null, fs, base, paths);
187+
}
188+
189+
/**
190+
* Get an invocation from the source class, which will be unavailable() if
191+
* the class is null or the method isn't found.
192+
*
193+
* @param <T> return type
194+
* @param source source. If null, the method is a no-op.
195+
* @param returnType return type class (unused)
196+
* @param name method name
197+
* @param parameterTypes parameters
198+
*
199+
* @return the method or "unavailable"
200+
*/
201+
private static <T> DynMethods.UnboundMethod loadInvocation(
202+
Class<?> source, Class<? extends T> returnType, String name, Class<?>... parameterTypes) {
203+
204+
if (source != null) {
205+
final DynMethods.UnboundMethod m = new DynMethods.Builder(name)
206+
.impl(source, name, parameterTypes)
207+
.orNoop()
208+
.build();
209+
if (m.isNoop()) {
210+
// this is a sign of a mismatch between this class's expected
211+
// signatures and actual ones.
212+
// log at debug.
213+
LOG.debug("Failed to load method {} from {}", name, source);
214+
} else {
215+
LOG.debug("Found method {} from {}", name, source);
216+
}
217+
return m;
218+
} else {
219+
return noop(name);
220+
}
221+
}
222+
223+
/**
224+
* Create a no-op method.
225+
*
226+
* @param name method name
227+
*
228+
* @return a no-op method.
229+
*/
230+
static DynMethods.UnboundMethod noop(final String name) {
231+
return new DynMethods.Builder(name).orNoop().build();
232+
}
233+
234+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
/**
20+
* Reflection based access to {@code org.apache.hadoop.io.wrappedio.WrappedIO}
21+
* class and the reflection-friendly methods inside it.
22+
* <p>
23+
* That class exists to assist libraries and applications which need to build
24+
* against older Hadoop versions to make use of the more recent APIs.
25+
*/
26+
package org.apache.iceberg.hadoop.wrappedio;

0 commit comments

Comments
 (0)