Skip to content

Commit 685379c

Browse files
authored
[Kernel]Generalize the actions after commit(like checkpoint) by introducing post commit action to kernel (#4115)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR doesn't make any functional changes, but abstract checkpoint into post commit action. This is prepared adding more post commit actions such as CRC write (#4116) ## How was this patch tested? Existing unit test, manual test using delta/kernel/examples/run-kernel-examples.py --use-local <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent 2d07216 commit 685379c

File tree

7 files changed

+160
-24
lines changed

7 files changed

+160
-24
lines changed

kernel/examples/kernel-examples/src/main/java/io/delta/kernel/examples/CreateTableAndInsertData.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
import java.io.IOException;
1919
import java.util.*;
20+
import java.util.concurrent.CompletableFuture;
2021

2122
import org.apache.commons.cli.Options;
2223

2324
import io.delta.kernel.*;
2425
import io.delta.kernel.data.FilteredColumnarBatch;
2526
import io.delta.kernel.data.Row;
2627
import io.delta.kernel.expressions.Literal;
28+
import io.delta.kernel.hook.PostCommitHook;
29+
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType;
2730
import io.delta.kernel.utils.*;
2831
import static io.delta.kernel.examples.utils.Utils.parseArgs;
2932

@@ -409,11 +412,17 @@ public void insertWithOptionalCheckpoint(String tablePath) throws IOException {
409412
// for every 10 versions.
410413
for (int i = 0; i < 12; i++) {
411414
TransactionCommitResult commitResult = insertDataIntoUnpartitionedTable(tablePath);
412-
if (commitResult.isReadyForCheckpoint()) {
415+
for(PostCommitHook hook: commitResult.getPostCommitHooks())
413416
// Checkpoint the table
414-
Table.forPath(engine, tablePath).checkpoint(engine, commitResult.getVersion());
415-
didCheckpoint = true;
416-
}
417+
didCheckpoint = didCheckpoint || CompletableFuture.supplyAsync(() -> {
418+
// run the code async
419+
try{
420+
hook.threadSafeInvoke(engine);
421+
} catch (IOException e) {
422+
return false;
423+
}
424+
return hook.getType().equals(PostCommitHookType.CHECKPOINT);
425+
}).join(); // wait async finish.
417426
}
418427

419428
if (!didCheckpoint) {

kernel/kernel-api/src/main/java/io/delta/kernel/TransactionCommitResult.java

+17-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import io.delta.kernel.annotation.Evolving;
1919
import io.delta.kernel.engine.Engine;
20+
import io.delta.kernel.hook.PostCommitHook;
2021
import io.delta.kernel.utils.CloseableIterable;
22+
import java.util.List;
2123

2224
/**
2325
* Contains the result of a successful transaction commit. Returned by {@link
@@ -28,11 +30,11 @@
2830
@Evolving
2931
public class TransactionCommitResult {
3032
private final long version;
31-
private final boolean isReadyForCheckpoint;
33+
private final List<PostCommitHook> postCommitHooks;
3234

33-
public TransactionCommitResult(long version, boolean isReadyForCheckpoint) {
35+
public TransactionCommitResult(long version, List<PostCommitHook> postCommitHooks) {
3436
this.version = version;
35-
this.isReadyForCheckpoint = isReadyForCheckpoint;
37+
this.postCommitHooks = postCommitHooks;
3638
}
3739

3840
/**
@@ -45,13 +47,19 @@ public long getVersion() {
4547
}
4648

4749
/**
48-
* Is the table ready for checkpoint (i.e. there are enough commits since the last checkpoint)? If
49-
* yes the connector can choose to checkpoint as the version the transaction is committed as using
50-
* {@link Table#checkpoint(Engine, long)}
50+
* Operations for connector to trigger post-commit.
5151
*
52-
* @return Is the table ready for checkpointing?
52+
* <p>Usage:
53+
*
54+
* <ul>
55+
* <li>Async: Call {@link PostCommitHook#threadSafeInvoke(Engine)} in separate thread.
56+
* <li>Sync: Direct call {@link PostCommitHook#threadSafeInvoke(Engine)} and block until
57+
* operation ends.
58+
* </ul>
59+
*
60+
* @return list of post-commit operations
5361
*/
54-
public boolean isReadyForCheckpoint() {
55-
return isReadyForCheckpoint;
62+
public List<PostCommitHook> getPostCommitHooks() {
63+
return postCommitHooks;
5664
}
5765
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.kernel.hook;
18+
19+
import io.delta.kernel.engine.Engine;
20+
import java.io.IOException;
21+
22+
/**
23+
* A hook for executing operation after a transaction commit. Hooks are added in the Transaction and
24+
* engine need to invoke the hook explicitly for executing the operation. Supported operations are
25+
* listed in {@link PostCommitHookType}.
26+
*/
27+
public interface PostCommitHook {
28+
29+
enum PostCommitHookType {
30+
/**
31+
* Writes a new checkpoint at the version committed by the transaction. This hook is present
32+
* when the table is ready for checkpoint according to its configured checkpoint interval. To
33+
* perform this operation, reading previous checkpoint + logs is required to construct a new
34+
* checkpoint, with latency scaling based on log size (typically seconds to minutes).
35+
*/
36+
CHECKPOINT
37+
}
38+
39+
/** Invokes the post commit operation whose implementation must be thread safe. */
40+
void threadSafeInvoke(Engine engine) throws IOException;
41+
42+
PostCommitHookType getType();
43+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@
2727
import io.delta.kernel.engine.Engine;
2828
import io.delta.kernel.exceptions.ConcurrentWriteException;
2929
import io.delta.kernel.expressions.Column;
30+
import io.delta.kernel.hook.PostCommitHook;
3031
import io.delta.kernel.internal.actions.*;
3132
import io.delta.kernel.internal.data.TransactionStateRow;
3233
import io.delta.kernel.internal.fs.Path;
34+
import io.delta.kernel.internal.hook.CheckpointHook;
3335
import io.delta.kernel.internal.metrics.TransactionMetrics;
3436
import io.delta.kernel.internal.metrics.TransactionReportImpl;
3537
import io.delta.kernel.internal.replay.ConflictChecker;
@@ -354,7 +356,12 @@ private TransactionCommitResult doCommit(
354356
"Write file actions to JSON log file `%s`",
355357
FileNames.deltaFile(logPath, commitAsVersion));
356358

357-
return new TransactionCommitResult(commitAsVersion, isReadyForCheckpoint(commitAsVersion));
359+
List<PostCommitHook> postCommitHooks = new ArrayList<>();
360+
if (isReadyForCheckpoint(commitAsVersion)) {
361+
postCommitHooks.add(new CheckpointHook(dataPath, commitAsVersion));
362+
}
363+
364+
return new TransactionCommitResult(commitAsVersion, postCommitHooks);
358365
} catch (FileAlreadyExistsException e) {
359366
throw e;
360367
} catch (IOException ioe) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (2025) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.internal.hook;
17+
18+
import io.delta.kernel.Table;
19+
import io.delta.kernel.engine.Engine;
20+
import io.delta.kernel.hook.PostCommitHook;
21+
import io.delta.kernel.internal.fs.Path;
22+
import java.io.IOException;
23+
24+
/** Write a new checkpoint at the version committed by the txn. */
25+
public class CheckpointHook implements PostCommitHook {
26+
27+
private final Path tablePath;
28+
private final long checkpointVersion;
29+
30+
public CheckpointHook(Path tablePath, long checkpointVersion) {
31+
this.tablePath = tablePath;
32+
this.checkpointVersion = checkpointVersion;
33+
}
34+
35+
@Override
36+
public void threadSafeInvoke(Engine engine) throws IOException {
37+
Table.forPath(engine, tablePath.toString()).checkpoint(engine, checkpointVersion);
38+
}
39+
40+
@Override
41+
public PostCommitHookType getType() {
42+
return PostCommitHookType.CHECKPOINT;
43+
}
44+
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWriteSuiteBase.scala

+31-7
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,15 @@ import io.delta.kernel.internal.util.FileNames
2626
import io.delta.kernel.internal.util.Utils.singletonCloseableIterator
2727
import io.delta.kernel.internal.{SnapshotImpl, TableConfig, TableImpl}
2828
import io.delta.kernel.utils.FileStatus
29-
import io.delta.kernel.{Meta, Operation, Table, Transaction, TransactionBuilder, TransactionCommitResult}
30-
import io.delta.kernel.data.{ColumnarBatch, ColumnVector, FilteredColumnarBatch, Row}
29+
import io.delta.kernel.{
30+
Meta,
31+
Operation,
32+
Table,
33+
Transaction,
34+
TransactionBuilder,
35+
TransactionCommitResult
36+
}
37+
import io.delta.kernel.data.{ColumnVector, ColumnarBatch, FilteredColumnarBatch, Row}
3138
import io.delta.kernel.defaults.internal.data.DefaultColumnarBatch
3239
import io.delta.kernel.expressions.Literal
3340
import io.delta.kernel.expressions.Literal.ofInt
@@ -39,6 +46,7 @@ import io.delta.kernel.types.StructType
3946
import io.delta.kernel.utils.CloseableIterable.{emptyIterable, inMemoryIterable}
4047
import io.delta.kernel.utils.CloseableIterator
4148
import io.delta.kernel.Operation.CREATE_TABLE
49+
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
4250
import org.apache.commons.io.FileUtils
4351
import org.apache.hadoop.conf.Configuration
4452
import org.apache.hadoop.fs.Path
@@ -139,10 +147,14 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
139147
tablePath: String,
140148
result: TransactionCommitResult,
141149
expSize: Long): Unit = {
142-
if (result.isReadyForCheckpoint) {
143-
Table.forPath(engine, tablePath).checkpoint(engine, result.getVersion)
144-
verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize)
145-
}
150+
result.getPostCommitHooks.forEach(
151+
hook => {
152+
if (hook.getType == PostCommitHookType.CHECKPOINT) {
153+
hook.threadSafeInvoke(engine)
154+
verifyLastCheckpointMetadata(tablePath, checkpointAt = result.getVersion, expSize)
155+
}
156+
}
157+
)
146158
}
147159

148160
/**
@@ -399,7 +411,7 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
399411
expVersion: Long,
400412
expIsReadyForCheckpoint: Boolean): Unit = {
401413
assert(result.getVersion === expVersion)
402-
assert(result.isReadyForCheckpoint === expIsReadyForCheckpoint)
414+
assertCheckpointReadiness(result, expIsReadyForCheckpoint)
403415
}
404416

405417
def verifyTableProperties(
@@ -421,4 +433,16 @@ trait DeltaTableWriteSuiteBase extends AnyFunSuite with TestUtils {
421433
builder.append("]")
422434
checkAnswer(resultProperties, Seq(builder.toString()).map(TestRow(_)))
423435
}
436+
437+
def assertCheckpointReadiness(
438+
txnResult: TransactionCommitResult,
439+
isReadyForCheckpoint: Boolean): Unit = {
440+
assert(
441+
txnResult.getPostCommitHooks
442+
.stream()
443+
.anyMatch(
444+
hook => hook.getType == PostCommitHookType.CHECKPOINT
445+
) === isReadyForCheckpoint
446+
)
447+
}
424448
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import io.delta.kernel.engine.Engine
2525
import io.delta.kernel.exceptions._
2626
import io.delta.kernel.expressions.Literal
2727
import io.delta.kernel.expressions.Literal._
28+
import io.delta.kernel.hook.PostCommitHook.PostCommitHookType
2829
import io.delta.kernel.internal.checkpoints.CheckpointerSuite.selectSingleElement
2930
import io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames
3031
import io.delta.kernel.internal.{SnapshotImpl, TableConfig}
@@ -131,7 +132,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
131132
val txnResult = txn.commit(engine, emptyIterable())
132133

133134
assert(txnResult.getVersion === 0)
134-
assert(!txnResult.isReadyForCheckpoint)
135+
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)
135136

136137
verifyCommitInfo(tablePath = tablePath, version = 0)
137138
verifyWrittenContent(tablePath, testSchema, Seq.empty)
@@ -350,7 +351,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
350351
val txnResult = txn.commit(engine, emptyIterable())
351352

352353
assert(txnResult.getVersion === 0)
353-
assert(!txnResult.isReadyForCheckpoint)
354+
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)
354355

355356
verifyCommitInfo(tablePath, version = 0, Seq("Part1", "part2"))
356357
verifyWrittenContent(tablePath, schema, Seq.empty)
@@ -368,7 +369,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
368369
val txnResult = txn.commit(engine, emptyIterable())
369370

370371
assert(txnResult.getVersion === 0)
371-
assert(!txnResult.isReadyForCheckpoint)
372+
assertCheckpointReadiness(txnResult, isReadyForCheckpoint = false)
372373

373374
verifyCommitInfo(tablePath, version = 0)
374375
verifyWrittenContent(tablePath, schema, Seq.empty)

0 commit comments

Comments
 (0)