Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] During Active-AddFile-Log-Replay do not pass the RemoveFile to checkpoint reader #4137

Merged
merged 1 commit into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@
* iterator of (ColumnarBatch, isFromCheckpoint) tuples, where the schema of the ColumnarBatch
* semantically represents actions (or, a subset of action fields) parsed from the Delta Log.
*
* <p>Users must pass in a `readSchema` to select which actions and sub-fields they want to consume.
* <p>Users must pass in a `deltaReadSchema` to select which actions and sub-fields they want to
* consume.
*
* <p>Users can also pass in an optional `checkpointReadSchema` if it is different from
* `deltaReadSchema`.
*/
public class ActionsIterator implements CloseableIterator<ActionWrapper> {
private final Engine engine;
Expand All @@ -60,7 +64,14 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
*/
private final LinkedList<DeltaLogFile> filesList;

private final StructType readSchema;
/** Schema used for reading delta files. */
anoopj marked this conversation as resolved.
Show resolved Hide resolved
private final StructType deltaReadSchema;

/**
* Schema to be used for reading checkpoint files. Checkpoint files can be Parquet or JSON in the
* case of v2 checkpoints.
*/
private final StructType checkpointReadSchema;

private final boolean schemaContainsAddOrRemoveFiles;

Expand All @@ -77,16 +88,26 @@ public class ActionsIterator implements CloseableIterator<ActionWrapper> {
public ActionsIterator(
Engine engine,
List<FileStatus> files,
StructType readSchema,
StructType deltaReadSchema,
Optional<Predicate> checkpointPredicate) {
this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate);
}

public ActionsIterator(
Engine engine,
List<FileStatus> files,
StructType deltaReadSchema,
StructType checkpointReadSchema,
anoopj marked this conversation as resolved.
Show resolved Hide resolved
Optional<Predicate> checkpointPredicate) {
this.engine = engine;
this.checkpointPredicate = checkpointPredicate;
this.filesList = new LinkedList<>();
this.filesList.addAll(
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
this.readSchema = readSchema;
this.deltaReadSchema = deltaReadSchema;
this.checkpointReadSchema = checkpointReadSchema;
this.actionsIter = Optional.empty();
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema);
}

@Override
Expand All @@ -105,7 +126,8 @@ public boolean hasNext() {

/**
* @return a tuple of (ColumnarBatch, isFromCheckpoint), where ColumnarBatch conforms to the
* instance {@link #readSchema}.
* instance {@link #deltaReadSchema} or {@link #checkpointReadSchema} (the latter when when
* isFromCheckpoint=true).
*/
@Override
public ActionWrapper next() {
Expand Down Expand Up @@ -176,9 +198,9 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
FileStatus file, String fileName) throws IOException {
// If the sidecars may contain the current action, read sidecars from the top-level v2
// checkpoint file(to be read later).
StructType modifiedReadSchema = readSchema;
StructType modifiedReadSchema = checkpointReadSchema;
if (schemaContainsAddOrRemoveFiles) {
modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema);
modifiedReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema);
}

long checkpointVersion = checkpointVersion(file.getPath());
Expand All @@ -195,7 +217,8 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
checkpointPredicateIncludingSidecars = checkpointPredicate;
}
final CloseableIterator<ColumnarBatch> topLevelIter;
StructType finalModifiedReadSchema = modifiedReadSchema;
StructType finalReadSchema = modifiedReadSchema;

if (fileName.endsWith(".parquet")) {
topLevelIter =
wrapEngineExceptionThrowsIO(
Expand All @@ -204,11 +227,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
.getParquetHandler()
.readParquetFiles(
singletonCloseableIterator(file),
finalModifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars),
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
file,
modifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars);
} else if (fileName.endsWith(".json")) {
topLevelIter =
Expand All @@ -218,11 +241,11 @@ private CloseableIterator<ColumnarBatch> getActionsIterFromSinglePartOrV2Checkpo
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(file),
finalModifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars),
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
file,
modifiedReadSchema,
finalReadSchema,
checkpointPredicateIncludingSidecars);
} else {
throw new IOException("Unrecognized top level v2 checkpoint file format: " + fileName);
Expand Down Expand Up @@ -309,10 +332,12 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
engine
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(nextFile), readSchema, Optional.empty()),
singletonCloseableIterator(nextFile),
deltaReadSchema,
Optional.empty()),
"Reading JSON log file `%s` with readSchema=%s",
nextFile,
readSchema);
deltaReadSchema);

return combine(
dataIter,
Expand Down Expand Up @@ -344,10 +369,11 @@ private CloseableIterator<ActionWrapper> getNextActionsIter() {
() ->
engine
.getParquetHandler()
.readParquetFiles(checkpointFiles, readSchema, checkpointPredicate),
.readParquetFiles(
checkpointFiles, deltaReadSchema, checkpointPredicate),
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
checkpointFiles,
readSchema,
deltaReadSchema,
checkpointPredicate);

long version = checkpointVersion(nextFilePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,13 @@ private void prepareNext() {
}
}

ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
// FilteredColumnarBatch
ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
// For checkpoint files, we would only have read the adds, not the removes.
if (!isFromCheckpoint) {
scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
}

// Step 4: TODO: remove this step. This is a temporary requirement until the path
// in `add` is converted to absolute path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
}

/** Read schema when searching only for AddFiles */
public static StructType getAddReadSchema(boolean shouldReadStats) {
return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats));
}

public static int ADD_FILE_ORDINAL = 0;
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
Expand Down Expand Up @@ -196,11 +201,15 @@ public CloseableIterator<FilteredColumnarBatch> getAddFilesAsColumnarBatches(
boolean shouldReadStats,
Optional<Predicate> checkpointPredicate,
ScanMetrics scanMetrics) {
// We do not need to look at any `remove` files from the checkpoints. Skip the column to save
// I/O. Note that we are still going to process the row groups. Adds and removes are randomly
// scattered through checkpoint part files, so row group push down is unlikely to be useful.
final CloseableIterator<ActionWrapper> addRemoveIter =
new ActionsIterator(
engine,
logSegment.allLogFilesReversed(),
getAddRemoveReadSchema(shouldReadStats),
getAddReadSchema(shouldReadStats),
checkpointPredicate);
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
}
Expand Down
Loading