Skip to content

Commit

Permalink
[Kernel] During Active-AddFile-Log-Replay do not pass the RemoveFile …
Browse files Browse the repository at this point in the history
…to checkpoint reader

#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Implemented a minor performance improvement to not read any RemoveFiles when we read checkpoint Parquet files during active-add-file-log-replay.
Fixes #4102

## How was this patch tested?

Existing unit test, manual test using
./run-tests.py --group kernel
and
delta/kernel/examples/run-kernel-examples.py --use-local

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
anoopj committed Feb 12, 2025
1 parent 685379c commit e93b240
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@
* iterator of (ColumnarBatch, isFromCheckpoint) tuples, where the schema of the ColumnarBatch
* semantically represents actions (or, a subset of action fields) parsed from the Delta Log.
*
* <p>Users must pass in a `readSchema` to select which actions and sub-fields they want to consume.
* <p>Users must pass in a `deltaReadSchema` to select which actions and sub-fields they want to
* consume.
*
* <p>Users can also pass in an optional `checkpointReadSchema` if it is different from
* `deltaReadSchema`.
*/
public class ActionsIterator implements CloseableIterator<ActionWrapper> {
private final Engine engine;
Expand All @@ -60,7 +64,14 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
*/
private final LinkedList<DeltaLogFile> filesList;

private final StructType readSchema;
/** Schema used for reading delta files. */
private final StructType deltaReadSchema;

/**
* Schema to be used for reading checkpoint files. Checkpoint files can be Parquet or JSON in the
* case of v2 checkpoints.
*/
private final StructType checkpointReadSchema;

private final boolean schemaContainsAddOrRemoveFiles;

Expand All @@ -77,16 +88,26 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
public ActionsIterator(
Engine engine,
List<FileStatus> files,
StructType readSchema,
StructType deltaReadSchema,
Optional<Predicate> checkpointPredicate) {
this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate);
}

public ActionsIterator(
Engine engine,
List<FileStatus> files,
StructType deltaReadSchema,
StructType checkpointReadSchema,
Optional<Predicate> checkpointPredicate) {
this.engine = engine;
this.checkpointPredicate = checkpointPredicate;
this.filesList = new LinkedList<>();
this.filesList.addAll(
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
this.readSchema = readSchema;
this.deltaReadSchema = deltaReadSchema;
this.checkpointReadSchema = checkpointReadSchema;
this.actionsIter = Optional.empty();
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema);
}

@Override
Expand All @@ -105,7 +126,8 @@ public boolean hasNext() {

/**
* @return a tuple of (ColumnarBatch, isFromCheckpoint), where ColumnarBatch conforms to the
* instance {@link #readSchema}.
* instance {@link #deltaReadSchema} or {@link #checkpointReadSchema} (the latter when when
* isFromCheckpoint=true).
*/
@Override
public ActionWrapper next() {
Expand Down Expand Up @@ -176,9 +198,9 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
FileStatus file, String fileName) throws IOException {
// If the sidecars may contain the current action, read sidecars from the top-level v2
// checkpoint file(to be read later).
StructType modifiedReadSchema = readSchema;
StructType modifiedReadSchema = checkpointReadSchema;
if (schemaContainsAddOrRemoveFiles) {
modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema);
modifiedReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema);
}

long checkpointVersion = checkpointVersion(file.getPath());
Expand All @@ -195,7 +217,8 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
checkpointPredicateIncludingSidecars = checkpointPredicate;
}
final CloseableIterator<ColumnarBatch> topLevelIter;
StructType finalModifiedReadSchema = modifiedReadSchema;
StructType finalReadSchema = modifiedReadSchema;

if (fileName.endsWith(".parquet")) {
topLevelIter =
wrapEngineExceptionThrowsIO(
Expand All @@ -204,11 +227,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
.getParquetHandler()
.readParquetFiles(
singletonCloseableIterator(file),
finalModifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars),
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
file,
modifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars);
} else if (fileName.endsWith(".json")) {
topLevelIter =
Expand All @@ -218,11 +241,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(file),
finalModifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars),
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
file,
modifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars);
} else {
throw new IOException("Unrecognized top level v2 checkpoint file format: " + fileName);
Expand Down Expand Up @@ -309,10 +332,12 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
engine
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(nextFile), readSchema, Optional.empty()),
singletonCloseableIterator(nextFile),
deltaReadSchema,
Optional.empty()),
"Reading JSON log file `%s` with readSchema=%s",
nextFile,
readSchema);
deltaReadSchema);

return combine(
dataIter,
Expand Down Expand Up @@ -344,10 +369,11 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
() ->
engine
.getParquetHandler()
.readParquetFiles(checkpointFiles, readSchema, checkpointPredicate),
.readParquetFiles(
checkpointFiles, deltaReadSchema, checkpointPredicate),
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
checkpointFiles,
readSchema,
deltaReadSchema,
checkpointPredicate);

long version = checkpointVersion(nextFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,13 @@ private void prepareNext() {
}
}

ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
// FilteredColumnarBatch
ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
// For checkpoint files, we would only have read the adds, not the removes.
if (!isFromCheckpoint) {
scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
}

// Step 4: TODO: remove this step. This is a temporary requirement until the path
// in `add` is converted to absolute path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
}

/** Read schema when searching only for AddFiles */
public static StructType getAddReadSchema(boolean shouldReadStats) {
return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats));
}

public static int ADD_FILE_ORDINAL = 0;
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
Expand Down Expand Up @@ -196,11 +201,15 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
boolean shouldReadStats,
Optional<Predicate> checkpointPredicate,
ScanMetrics scanMetrics) {
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
final CloseableIterator<ActionWrapper> addRemoveIter =
new ActionsIterator(
engine,
logSegment.allLogFilesReversed(),
getAddRemoveReadSchema(shouldReadStats),
getAddReadSchema(shouldReadStats),
checkpointPredicate);
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
}
Expand Down

0 comments on commit e93b240

Please sign in to comment.