Skip to content

Commit 529d8d9

Browse files
committed
[Kernel] During Active-AddFile-Log-Replay do not pass the RemoveFile to checkpoint reader
#### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Implemented a minor performance improvement to not read any RemoveFiles when we read checkpoint Parquet files during active-add-file-log-replay. Fixes #4102 ## How was this patch tested? Existing unit test, manual test using ./run-tests.py --group kernel and delta/kernel/examples/run-kernel-examples.py --use-local ## Does this PR introduce _any_ user-facing changes? No
1 parent 685379c commit 529d8d9

File tree

3 files changed

+57
-18
lines changed

3 files changed

+57
-18
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java

+43-17
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@
4444
* iterator of (ColumnarBatch, isFromCheckpoint) tuples, where the schema of the ColumnarBatch
4545
* semantically represents actions (or, a subset of action fields) parsed from the Delta Log.
4646
*
47-
* <p>Users must pass in a `readSchema` to select which actions and sub-fields they want to consume.
47+
* <p>Users must pass in a `deltaReadSchema` to select which actions and sub-fields they want to
48+
* consume.
49+
*
50+
* <p>Users can also pass in an optional `checkpointReadSchema` if it is different from
51+
* `deltaReadSchema`
4852
*/
4953
public class ActionsIterator implements CloseableIterator<ActionWrapper> {
5054
private final Engine engine;
@@ -60,7 +64,14 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
6064
*/
6165
private final LinkedList<DeltaLogFile> filesList;
6266

63-
private final StructType readSchema;
67+
/** Schema used for reading delta files. */
68+
private final StructType deltaReadSchema;
69+
70+
/**
71+
* Schema to be used for reading checkpoint files. Checkpoint files can be Parquet or JSON in the
72+
* case of v2 checkpoints.
73+
*/
74+
private final StructType checkpointReadSchema;
6475

6576
private final boolean schemaContainsAddOrRemoveFiles;
6677

@@ -77,16 +88,26 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
7788
public ActionsIterator(
7889
Engine engine,
7990
List<FileStatus> files,
80-
StructType readSchema,
91+
StructType deltaReadSchema,
92+
Optional<Predicate> checkpointPredicate) {
93+
this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate);
94+
}
95+
96+
public ActionsIterator(
97+
Engine engine,
98+
List<FileStatus> files,
99+
StructType deltaReadSchema,
100+
StructType checkpointReadSchema,
81101
Optional<Predicate> checkpointPredicate) {
82102
this.engine = engine;
83103
this.checkpointPredicate = checkpointPredicate;
84104
this.filesList = new LinkedList<>();
85105
this.filesList.addAll(
86106
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
87-
this.readSchema = readSchema;
107+
this.deltaReadSchema = deltaReadSchema;
108+
this.checkpointReadSchema = checkpointReadSchema;
88109
this.actionsIter = Optional.empty();
89-
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
110+
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema);
90111
}
91112

92113
@Override
@@ -105,7 +126,8 @@ public boolean hasNext() {
105126

106127
/**
107128
* @return a tuple of (ColumnarBatch, isFromCheckpoint), where ColumnarBatch conforms to the
108-
* instance {@link #readSchema}.
129+
* instance {@link #deltaReadSchema} or {@link #checkpointReadSchema} (the latter when when
130+
* isFromCheckpoint=true).
109131
*/
110132
@Override
111133
public ActionWrapper next() {
@@ -176,9 +198,9 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
176198
FileStatus file, String fileName) throws IOException {
177199
// If the sidecars may contain the current action, read sidecars from the top-level v2
178200
// checkpoint file(to be read later).
179-
StructType modifiedReadSchema = readSchema;
201+
StructType modifiedReadSchema = checkpointReadSchema;
180202
if (schemaContainsAddOrRemoveFiles) {
181-
modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema);
203+
modifiedReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema);
182204
}
183205

184206
long checkpointVersion = checkpointVersion(file.getPath());
@@ -195,7 +217,8 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
195217
checkpointPredicateIncludingSidecars = checkpointPredicate;
196218
}
197219
final CloseableIterator<ColumnarBatch> topLevelIter;
198-
StructType finalModifiedReadSchema = modifiedReadSchema;
220+
StructType finalReadSchema = modifiedReadSchema;
221+
199222
if (fileName.endsWith(".parquet")) {
200223
topLevelIter =
201224
wrapEngineExceptionThrowsIO(
@@ -204,11 +227,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
204227
.getParquetHandler()
205228
.readParquetFiles(
206229
singletonCloseableIterator(file),
207-
finalModifiedReadSchema,
230+
finalReadSchema,
208231
checkpointPredicateIncludingSidecars),
209232
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
210233
file,
211-
modifiedReadSchema,
234+
finalReadSchema,
212235
checkpointPredicateIncludingSidecars);
213236
} else if (fileName.endsWith(".json")) {
214237
topLevelIter =
@@ -218,11 +241,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
218241
.getJsonHandler()
219242
.readJsonFiles(
220243
singletonCloseableIterator(file),
221-
finalModifiedReadSchema,
244+
finalReadSchema,
222245
checkpointPredicateIncludingSidecars),
223246
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
224247
file,
225-
modifiedReadSchema,
248+
finalReadSchema,
226249
checkpointPredicateIncludingSidecars);
227250
} else {
228251
throw new IOException("Unrecognized top level v2 checkpoint file format: " + fileName);
@@ -309,10 +332,12 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
309332
engine
310333
.getJsonHandler()
311334
.readJsonFiles(
312-
singletonCloseableIterator(nextFile), readSchema, Optional.empty()),
335+
singletonCloseableIterator(nextFile),
336+
deltaReadSchema,
337+
Optional.empty()),
313338
"Reading JSON log file `%s` with readSchema=%s",
314339
nextFile,
315-
readSchema);
340+
deltaReadSchema);
316341

317342
return combine(
318343
dataIter,
@@ -344,10 +369,11 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
344369
() ->
345370
engine
346371
.getParquetHandler()
347-
.readParquetFiles(checkpointFiles, readSchema, checkpointPredicate),
372+
.readParquetFiles(
373+
checkpointFiles, deltaReadSchema, checkpointPredicate),
348374
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
349375
checkpointFiles,
350-
readSchema,
376+
deltaReadSchema,
351377
checkpointPredicate);
352378

353379
long version = checkpointVersion(nextFilePath);

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,13 @@ private void prepareNext() {
233233
}
234234
}
235235

236+
ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
236237
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
237238
// FilteredColumnarBatch
238-
ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
239+
// For checkpoint files, we would only have read the adds, not the removes.
240+
if (!isFromCheckpoint) {
241+
scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
242+
}
239243

240244
// Step 4: TODO: remove this step. This is a temporary requirement until the path
241245
// in `add` is converted to absolute path.

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java

+9
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
106106
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
107107
}
108108

109+
/** Read schema when searching only for AddFiles */
110+
public static StructType getAddReadSchema(boolean shouldReadStats) {
111+
return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats));
112+
}
113+
109114
public static int ADD_FILE_ORDINAL = 0;
110115
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
111116
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
@@ -196,11 +201,15 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
196201
boolean shouldReadStats,
197202
Optional<Predicate> checkpointPredicate,
198203
ScanMetrics scanMetrics) {
204+
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
205+
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
206+
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
199207
final CloseableIterator<ActionWrapper> addRemoveIter =
200208
new ActionsIterator(
201209
engine,
202210
logSegment.allLogFilesReversed(),
203211
getAddRemoveReadSchema(shouldReadStats),
212+
getAddReadSchema(shouldReadStats),
204213
checkpointPredicate);
205214
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
206215
}

0 commit comments

Comments
 (0)