Skip to content

Commit 4a92f4e

Browse files
committed
[Kernel] During Active-AddFile-Log-Replay do not pass the RemoveFile to checkpoint reader
#### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Implemented a minor performance improvement to not read any RemoveFiles when we read checkpoint Parquet files during active-add-file-log-replay. ## How was this patch tested? Existing unit test, manual test using ./run-tests.py --group kernel and delta/kernel/examples/run-kernel-examples.py --use-local ## Does this PR introduce _any_ user-facing changes? No
1 parent 685379c commit 4a92f4e

File tree

3 files changed

+32
-4
lines changed

3 files changed

+32
-4
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java

+22-3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
6262

6363
private final StructType readSchema;
6464

65+
private final StructType checkpointReadSchema;
66+
6567
private final boolean schemaContainsAddOrRemoveFiles;
6668

6769
/**
@@ -79,12 +81,22 @@ public ActionsIterator(
7981
List<FileStatus> files,
8082
StructType readSchema,
8183
Optional<Predicate> checkpointPredicate) {
84+
this(engine, files, readSchema, readSchema, checkpointPredicate);
85+
}
86+
87+
public ActionsIterator(
88+
Engine engine,
89+
List<FileStatus> files,
90+
StructType readSchema,
91+
StructType checkpointReadSchema,
92+
Optional<Predicate> checkpointPredicate) {
8293
this.engine = engine;
8394
this.checkpointPredicate = checkpointPredicate;
8495
this.filesList = new LinkedList<>();
8596
this.filesList.addAll(
8697
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
8798
this.readSchema = readSchema;
99+
this.checkpointReadSchema = checkpointReadSchema;
88100
this.actionsIter = Optional.empty();
89101
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
90102
}
@@ -177,8 +189,10 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
177189
// If the sidecars may contain the current action, read sidecars from the top-level v2
178190
// checkpoint file(to be read later).
179191
StructType modifiedReadSchema = readSchema;
192+
StructType modifiedCheckpointReadSchema = checkpointReadSchema;
180193
if (schemaContainsAddOrRemoveFiles) {
181194
modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema);
195+
modifiedCheckpointReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema);
182196
}
183197

184198
long checkpointVersion = checkpointVersion(file.getPath());
@@ -195,7 +209,12 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
195209
checkpointPredicateIncludingSidecars = checkpointPredicate;
196210
}
197211
final CloseableIterator<ColumnarBatch> topLevelIter;
198-
StructType finalModifiedReadSchema = modifiedReadSchema;
212+
StructType finalCommitReadSchema = modifiedReadSchema;
213+
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
214+
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
215+
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
216+
StructType finalCheckpointReadSchema = modifiedCheckpointReadSchema;
217+
199218
if (fileName.endsWith(".parquet")) {
200219
topLevelIter =
201220
wrapEngineExceptionThrowsIO(
@@ -204,7 +223,7 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
204223
.getParquetHandler()
205224
.readParquetFiles(
206225
singletonCloseableIterator(file),
207-
finalModifiedReadSchema,
226+
finalCheckpointReadSchema,
208227
checkpointPredicateIncludingSidecars),
209228
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
210229
file,
@@ -218,7 +237,7 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
218237
.getJsonHandler()
219238
.readJsonFiles(
220239
singletonCloseableIterator(file),
221-
finalModifiedReadSchema,
240+
finalCommitReadSchema,
222241
checkpointPredicateIncludingSidecars),
223242
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
224243
file,

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,12 @@ private void prepareNext() {
233233
}
234234
}
235235

236+
ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
236237
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
237238
// FilteredColumnarBatch
238-
ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
239+
if (scanAddFiles.getSchema().length() > 1) {
240+
scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
241+
}
239242

240243
// Step 4: TODO: remove this step. This is a temporary requirement until the path
241244
// in `add` is converted to absolute path.

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java

+6
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
106106
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
107107
}
108108

109+
/** Read schema when searching only for AddFiles */
110+
public static StructType getAddReadSchema(boolean shouldReadStats) {
111+
return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats));
112+
}
113+
109114
public static int ADD_FILE_ORDINAL = 0;
110115
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
111116
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
@@ -201,6 +206,7 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
201206
engine,
202207
logSegment.allLogFilesReversed(),
203208
getAddRemoveReadSchema(shouldReadStats),
209+
getAddReadSchema(shouldReadStats),
204210
checkpointPredicate);
205211
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
206212
}

0 commit comments

Comments
 (0)