diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java
index 8f9486b4191..c5c00a1c2a6 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java
@@ -44,7 +44,11 @@
* iterator of (ColumnarBatch, isFromCheckpoint) tuples, where the schema of the ColumnarBatch
* semantically represents actions (or, a subset of action fields) parsed from the Delta Log.
*
- *
Users must pass in a `readSchema` to select which actions and sub-fields they want to consume.
+ *
Users must pass in a `deltaReadSchema` to select which actions and sub-fields they want to
+ * consume.
+ *
+ *
Users can also pass in an optional `checkpointReadSchema` if it is different from
+ * `deltaReadSchema`.
*/
public class ActionsIterator implements CloseableIterator {
private final Engine engine;
@@ -60,7 +64,14 @@ public class ActionsIterator implements CloseableIterator {
*/
private final LinkedList filesList;
- private final StructType readSchema;
+ /** Schema used for reading delta files. */
+ private final StructType deltaReadSchema;
+
+ /**
+ * Schema to be used for reading checkpoint files. Checkpoint files can be Parquet or JSON in the
+ * case of v2 checkpoints.
+ */
+ private final StructType checkpointReadSchema;
private final boolean schemaContainsAddOrRemoveFiles;
@@ -77,16 +88,26 @@ public class ActionsIterator implements CloseableIterator {
public ActionsIterator(
Engine engine,
List files,
- StructType readSchema,
+ StructType deltaReadSchema,
+ Optional checkpointPredicate) {
+ this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate);
+ }
+
+ public ActionsIterator(
+ Engine engine,
+ List files,
+ StructType deltaReadSchema,
+ StructType checkpointReadSchema,
Optional checkpointPredicate) {
this.engine = engine;
this.checkpointPredicate = checkpointPredicate;
this.filesList = new LinkedList<>();
this.filesList.addAll(
files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList()));
- this.readSchema = readSchema;
+ this.deltaReadSchema = deltaReadSchema;
+ this.checkpointReadSchema = checkpointReadSchema;
this.actionsIter = Optional.empty();
- this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema);
+ this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema);
}
@Override
@@ -105,7 +126,8 @@ public boolean hasNext() {
/**
* @return a tuple of (ColumnarBatch, isFromCheckpoint), where ColumnarBatch conforms to the
- * instance {@link #readSchema}.
+ * instance {@link #deltaReadSchema} or {@link #checkpointReadSchema} (the latter when when
+ * isFromCheckpoint=true).
*/
@Override
public ActionWrapper next() {
@@ -176,9 +198,9 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo
FileStatus file, String fileName) throws IOException {
// If the sidecars may contain the current action, read sidecars from the top-level v2
// checkpoint file(to be read later).
- StructType modifiedReadSchema = readSchema;
+ StructType modifiedReadSchema = checkpointReadSchema;
if (schemaContainsAddOrRemoveFiles) {
- modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema);
+ modifiedReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema);
}
long checkpointVersion = checkpointVersion(file.getPath());
@@ -195,7 +217,8 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo
checkpointPredicateIncludingSidecars = checkpointPredicate;
}
final CloseableIterator topLevelIter;
- StructType finalModifiedReadSchema = modifiedReadSchema;
+ StructType finalReadSchema = modifiedReadSchema;
+
if (fileName.endsWith(".parquet")) {
topLevelIter =
wrapEngineExceptionThrowsIO(
@@ -204,11 +227,11 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo
.getParquetHandler()
.readParquetFiles(
singletonCloseableIterator(file),
- finalModifiedReadSchema,
+ finalReadSchema,
checkpointPredicateIncludingSidecars),
"Reading parquet log file `%s` with readSchema=%s and predicate=%s",
file,
- modifiedReadSchema,
+ finalReadSchema,
checkpointPredicateIncludingSidecars);
} else if (fileName.endsWith(".json")) {
topLevelIter =
@@ -218,11 +241,11 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo
.getJsonHandler()
.readJsonFiles(
singletonCloseableIterator(file),
- finalModifiedReadSchema,
+ finalReadSchema,
checkpointPredicateIncludingSidecars),
"Reading JSON log file `%s` with readSchema=%s and predicate=%s",
file,
- modifiedReadSchema,
+ finalReadSchema,
checkpointPredicateIncludingSidecars);
} else {
throw new IOException("Unrecognized top level v2 checkpoint file format: " + fileName);
@@ -309,10 +332,12 @@ private CloseableIterator getNextActionsIter() {
engine
.getJsonHandler()
.readJsonFiles(
- singletonCloseableIterator(nextFile), readSchema, Optional.empty()),
+ singletonCloseableIterator(nextFile),
+ deltaReadSchema,
+ Optional.empty()),
"Reading JSON log file `%s` with readSchema=%s",
nextFile,
- readSchema);
+ deltaReadSchema);
return combine(
dataIter,
@@ -344,10 +369,11 @@ private CloseableIterator getNextActionsIter() {
() ->
engine
.getParquetHandler()
- .readParquetFiles(checkpointFiles, readSchema, checkpointPredicate),
+ .readParquetFiles(
+ checkpointFiles, deltaReadSchema, checkpointPredicate),
"Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s",
checkpointFiles,
- readSchema,
+ deltaReadSchema,
checkpointPredicate);
long version = checkpointVersion(nextFilePath);
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java
index fe053c19cdc..5fd4a317ae3 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java
@@ -233,9 +233,13 @@ private void prepareNext() {
}
}
+ ColumnarBatch scanAddFiles = addRemoveColumnarBatch;
// Step 3: Drop the RemoveFile column and use the selection vector to build a new
// FilteredColumnarBatch
- ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1);
+ // For checkpoint files, we would only have read the adds, not the removes.
+ if (!isFromCheckpoint) {
+ scanAddFiles = scanAddFiles.withDeletedColumnAt(1);
+ }
// Step 4: TODO: remove this step. This is a temporary requirement until the path
// in `add` is converted to absolute path.
diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
index f51d210ca98..ae3d97d5c24 100644
--- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
+++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
@@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) {
.add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA);
}
+ /** Read schema when searching only for AddFiles */
+ public static StructType getAddReadSchema(boolean shouldReadStats) {
+ return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats));
+ }
+
public static int ADD_FILE_ORDINAL = 0;
public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path");
public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector");
@@ -196,11 +201,15 @@ public CloseableIterator getAddFilesAsColumnarBatches(
boolean shouldReadStats,
Optional checkpointPredicate,
ScanMetrics scanMetrics) {
+ // We do not need to look at any `remove` files from the checkpoints. Skip the column to save
+ // I/O. Note that we are still going to process the row groups. Adds and removes are randomly
+ // scattered through checkpoint part files, so row group push down is unlikely to be useful.
final CloseableIterator addRemoveIter =
new ActionsIterator(
engine,
logSegment.allLogFilesReversed(),
getAddRemoveReadSchema(shouldReadStats),
+ getAddReadSchema(shouldReadStats),
checkpointPredicate);
return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics);
}