diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java index 8f9486b4191..742ec1a3363 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActionsIterator.java @@ -60,7 +60,14 @@ public class ActionsIterator implements CloseableIterator { */ private final LinkedList filesList; - private final StructType readSchema; + /** Schema used for reading delta files. */ + private final StructType deltaReadSchema; + + /** + * Schema to be used for reading checkpoint files. Checkpoint files can be Parquet or JSON in the + * case of v2 checkpoints. + */ + private final StructType checkpointReadSchema; private final boolean schemaContainsAddOrRemoveFiles; @@ -77,16 +84,26 @@ public class ActionsIterator implements CloseableIterator { public ActionsIterator( Engine engine, List files, - StructType readSchema, + StructType deltaReadSchema, + Optional checkpointPredicate) { + this(engine, files, deltaReadSchema, deltaReadSchema, checkpointPredicate); + } + + public ActionsIterator( + Engine engine, + List files, + StructType deltaReadSchema, + StructType checkpointReadSchema, Optional checkpointPredicate) { this.engine = engine; this.checkpointPredicate = checkpointPredicate; this.filesList = new LinkedList<>(); this.filesList.addAll( files.stream().map(DeltaLogFile::forCommitOrCheckpoint).collect(Collectors.toList())); - this.readSchema = readSchema; + this.deltaReadSchema = deltaReadSchema; + this.checkpointReadSchema = checkpointReadSchema; this.actionsIter = Optional.empty(); - this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(readSchema); + this.schemaContainsAddOrRemoveFiles = LogReplay.containsAddOrRemoveFileActions(deltaReadSchema); } @Override @@ -105,7 +122,7 @@ public boolean hasNext() { /** * @return a tuple of (ColumnarBatch, isFromCheckpoint), where ColumnarBatch conforms to the - * instance {@link #readSchema}. + * instance {@link #deltaReadSchema}. */ @Override public ActionWrapper next() { @@ -176,9 +193,9 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo FileStatus file, String fileName) throws IOException { // If the sidecars may contain the current action, read sidecars from the top-level v2 // checkpoint file(to be read later). - StructType modifiedReadSchema = readSchema; + StructType modifiedReadSchema = checkpointReadSchema; if (schemaContainsAddOrRemoveFiles) { - modifiedReadSchema = LogReplay.withSidecarFileSchema(readSchema); + modifiedReadSchema = LogReplay.withSidecarFileSchema(checkpointReadSchema); } long checkpointVersion = checkpointVersion(file.getPath()); @@ -195,7 +212,8 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo checkpointPredicateIncludingSidecars = checkpointPredicate; } final CloseableIterator topLevelIter; - StructType finalModifiedReadSchema = modifiedReadSchema; + StructType finalReadSchema = modifiedReadSchema; + if (fileName.endsWith(".parquet")) { topLevelIter = wrapEngineExceptionThrowsIO( @@ -204,11 +222,11 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo .getParquetHandler() .readParquetFiles( singletonCloseableIterator(file), - finalModifiedReadSchema, + finalReadSchema, checkpointPredicateIncludingSidecars), "Reading parquet log file `%s` with readSchema=%s and predicate=%s", file, - modifiedReadSchema, + finalReadSchema, checkpointPredicateIncludingSidecars); } else if (fileName.endsWith(".json")) { topLevelIter = @@ -218,11 +236,11 @@ private CloseableIterator getActionsIterFromSinglePartOrV2Checkpo .getJsonHandler() .readJsonFiles( singletonCloseableIterator(file), - finalModifiedReadSchema, + finalReadSchema, checkpointPredicateIncludingSidecars), "Reading JSON log file `%s` with readSchema=%s and predicate=%s", file, - modifiedReadSchema, + finalReadSchema, checkpointPredicateIncludingSidecars); } else { throw new IOException("Unrecognized top level v2 checkpoint file format: " + fileName); @@ -309,10 +327,12 @@ private CloseableIterator getNextActionsIter() { engine .getJsonHandler() .readJsonFiles( - singletonCloseableIterator(nextFile), readSchema, Optional.empty()), + singletonCloseableIterator(nextFile), + deltaReadSchema, + Optional.empty()), "Reading JSON log file `%s` with readSchema=%s", nextFile, - readSchema); + deltaReadSchema); return combine( dataIter, @@ -344,10 +364,11 @@ private CloseableIterator getNextActionsIter() { () -> engine .getParquetHandler() - .readParquetFiles(checkpointFiles, readSchema, checkpointPredicate), + .readParquetFiles( + checkpointFiles, deltaReadSchema, checkpointPredicate), "Reading checkpoint sidecars [%s] with readSchema=%s and predicate=%s", checkpointFiles, - readSchema, + deltaReadSchema, checkpointPredicate); long version = checkpointVersion(nextFilePath); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java index fe053c19cdc..7c6cd266813 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java @@ -233,9 +233,13 @@ private void prepareNext() { } } + ColumnarBatch scanAddFiles = addRemoveColumnarBatch; // Step 3: Drop the RemoveFile column and use the selection vector to build a new // FilteredColumnarBatch - ColumnarBatch scanAddFiles = addRemoveColumnarBatch.withDeletedColumnAt(1); + // For Parquet files, we would only have read the adds, not the removes, hence the check. + if (scanAddFiles.getSchema().length() > 1) { + scanAddFiles = scanAddFiles.withDeletedColumnAt(1); + } // Step 4: TODO: remove this step. This is a temporary requirement until the path // in `add` is converted to absolute path. diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index f51d210ca98..ae3d97d5c24 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -106,6 +106,11 @@ public static StructType getAddRemoveReadSchema(boolean shouldReadStats) { .add(REMOVEFILE_FIELD_NAME, REMOVE_FILE_SCHEMA); } + /** Read schema when searching only for AddFiles */ + public static StructType getAddReadSchema(boolean shouldReadStats) { + return new StructType().add(ADDFILE_FIELD_NAME, getAddSchema(shouldReadStats)); + } + public static int ADD_FILE_ORDINAL = 0; public static int ADD_FILE_PATH_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("path"); public static int ADD_FILE_DV_ORDINAL = AddFile.SCHEMA_WITHOUT_STATS.indexOf("deletionVector"); @@ -196,11 +201,15 @@ public CloseableIterator getAddFilesAsColumnarBatches( boolean shouldReadStats, Optional checkpointPredicate, ScanMetrics scanMetrics) { + // We do not need to look at any `remove` files from the checkpoints. Skip the column to save + // I/O. Note that we are still going to process the row groups. Adds and removes are randomly + // scattered through checkpoint part files, so row group push down is unlikely to be useful. final CloseableIterator addRemoveIter = new ActionsIterator( engine, logSegment.allLogFilesReversed(), getAddRemoveReadSchema(shouldReadStats), + getAddReadSchema(shouldReadStats), checkpointPredicate); return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics); }