Skip to content

Commit fbe3c3d

Browse files
committed
[Kernel] During Active-AddFile-Log-Replay do not pass the RemoveFile to checkpoint reader
#### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Implemented a minor performance improvement to not read any RemoveFiles when we read checkpoint Parquet files during active-add-file-log-replay. Fixes #4102 ## How was this patch tested? Existing unit test, manual test using ./run-tests.py --group kernel and delta/kernel/examples/run-kernel-examples.py --use-local ## Does this PR introduce _any_ user-facing changes? No
1 parent 685379c commit fbe3c3d

File tree

3 files changed

+51
-17
lines changed

3 files changed

+51
-17
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java

+37-16
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,14 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
6060
*/
6161
private final LinkedList<DeltaLogFile> filesList;
6262

63-
private final StructType readSchema;
63+
/** Schema used for reading delta files. */
64+
private final StructType deltaReadSchema;
65+
66+
/**
67+
* Schema to be used for reading checkpoint files. Checkpoint files can be Parquet or JSON in the
68+
* case of v2 checkpoints.
69+
*/
70+
private final StructType checkpointReadSchema;
6471

6572
private final boolean schemaContainsAddOrRemoveFiles;
6673

@@ -77,16 +84,26 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
7784
public ActionsIterator(
7885
Engine engine,
7986
List<FileStatus> files,
80-
StructType readSchema,
87+
StructType deltaReadSchema,
88+
Optional<Predicate> checkpointPredicate) {
89+
this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate);
90+
}
91+
92+
public ActionsIterator(
93+
Engine engine,
94+
List<FileStatus> files,
95+
StructType deltaReadSchema,
96+
StructType checkpointReadSchema,
8197
Optional<Predicate> checkpointPredicate) {
8298
this.engine = engine;
8399
this.checkpointPredicate = checkpointPredicate;
84100
this.filesList = new LinkedList<>();
85101
this.filesList.addAll(
86102
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
87-
this.readSchema = readSchema;
103+
this.deltaReadSchema = deltaReadSchema;
104+
this.checkpointReadSchema = checkpointReadSchema;
88105
this.actionsIter = Optional.empty();
89-
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
106+
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema);
90107
}
91108

92109
@Override
@@ -105,7 +122,7 @@ public boolean hasNext() {
105122

106123
/**
107124
* @return a tuple of (ColumnarBatch, isFromCheckpoint), where ColumnarBatch conforms to the
108-
* instance {@link #readSchema}.
125+
* instance {@link #deltaReadSchema}.
109126
*/
110127
@Override
111128
public ActionWrapper next() {
@@ -176,9 +193,9 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
176193
FileStatus file, String fileName) throws IOException {
177194
// If the sidecars may contain the current action, read sidecars from the top-level v2
178195
// checkpoint file(to be read later).
179-
StructType modifiedReadSchema = readSchema;
196+
StructType modifiedReadSchema = checkpointReadSchema;
180197
if (schemaContainsAddOrRemoveFiles) {
181-
modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema);
198+
modifiedReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema);
182199
}
183200

184201
long checkpointVersion = checkpointVersion(file.getPath());
@@ -195,7 +212,8 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
195212
checkpointPredicateIncludingSidecars = checkpointPredicate;
196213
}
197214
final CloseableIterator<ColumnarBatch> topLevelIter;
198-
StructType finalModifiedReadSchema = modifiedReadSchema;
215+
StructType finalReadSchema = modifiedReadSchema;
216+
199217
if (fileName.endsWith(".parquet")) {
200218
topLevelIter =
201219
wrapEngineExceptionThrowsIO(
@@ -204,11 +222,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
204222
.getParquetHandler()
205223
.readParquetFiles(
206224
singletonCloseableIterator(file),
207-
finalModifiedReadSchema,
225+
finalReadSchema,
208226
checkpointPredicateIncludingSidecars),
209227
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
210228
file,
211-
modifiedReadSchema,
229+
finalReadSchema,
212230
checkpointPredicateIncludingSidecars);
213231
} else if (fileName.endsWith(".json")) {
214232
topLevelIter =
@@ -218,11 +236,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
218236
.getJsonHandler()
219237
.readJsonFiles(
220238
singletonCloseableIterator(file),
221-
finalModifiedReadSchema,
239+
finalReadSchema,
222240
checkpointPredicateIncludingSidecars),
223241
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
224242
file,
225-
modifiedReadSchema,
243+
finalReadSchema,
226244
checkpointPredicateIncludingSidecars);
227245
} else {
228246
throw new IOException("Unrecognized top level v2 checkpoint file format: " + fileName);
@@ -309,10 +327,12 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
309327
engine
310328
.getJsonHandler()
311329
.readJsonFiles(
312-
singletonCloseableIterator(nextFile), readSchema, Optional.empty()),
330+
singletonCloseableIterator(nextFile),
331+
deltaReadSchema,
332+
Optional.empty()),
313333
"Reading JSON log file `%s` with readSchema=%s",
314334
nextFile,
315-
readSchema);
335+
deltaReadSchema);
316336

317337
return combine(
318338
dataIter,
@@ -344,10 +364,11 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
344364
() ->
345365
engine
346366
.getParquetHandler()
347-
.readParquetFiles(checkpointFiles, readSchema, checkpointPredicate),
367+
.readParquetFiles(
368+
checkpointFiles, deltaReadSchema, checkpointPredicate),
348369
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
349370
checkpointFiles,
350-
readSchema,
371+
deltaReadSchema,
351372
checkpointPredicate);
352373

353374
long version = checkpointVersion(nextFilePath);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,13 @@ private void prepareNext() {
233233
}
234234
}
235235

236+
ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
236237
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
237238
// FilteredColumnarBatch
238-
ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
239+
// For Parquet files, we would only have read the adds, not the removes, hence the check.
240+
if (scanAddFiles.getSchema().length() > 1) {
241+
scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
242+
}
239243

240244
// Step 4: TODO: remove this step. This is a temporary requirement until the path
241245
// in `add` is converted to absolute path.

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java

+9
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
106106
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
107107
}
108108

109+
/** Read schema when searching only for AddFiles */
110+
public static StructType getAddReadSchema(boolean shouldReadStats) {
111+
return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats));
112+
}
113+
109114
public static int ADD_FILE_ORDINAL = 0;
110115
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
111116
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
@@ -196,11 +201,15 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
196201
boolean shouldReadStats,
197202
Optional<Predicate> checkpointPredicate,
198203
ScanMetrics scanMetrics) {
204+
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
205+
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
206+
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
199207
final CloseableIterator<ActionWrapper> addRemoveIter =
200208
new ActionsIterator(
201209
engine,
202210
logSegment.allLogFilesReversed(),
203211
getAddRemoveReadSchema(shouldReadStats),
212+
getAddReadSchema(shouldReadStats),
204213
checkpointPredicate);
205214
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
206215
}

0 commit comments

Comments
 (0)