Skip to content

Commit ad64b91

Browse files
StreamTask.commit -- Committing Task
1 parent 2552c39 commit ad64b91

6 files changed

+67
-14
lines changed

SUMMARY.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@
162162
.. link:kafka-streams-StreamsMetricsImpl.adoc[StreamsMetricsImpl]
163163
.. link:kafka-streams-StreamsMetricsThreadImpl.adoc[StreamsMetricsThreadImpl]
164164

165+
. link:kafka-streams-StreamTask-TaskMetrics.adoc[TaskMetrics]
166+
165167
== Testing
166168

167169
. link:kafka-streams-TopologyTestDriver.adoc[TopologyTestDriver]

kafka-streams-AbstractTask.adoc

+9
Original file line numberDiff line numberDiff line change
@@ -198,3 +198,12 @@ void reinitializeStateStoresForPartitions(final Collection<TopicPartition> parti
198198
199199
* `StreamThread` is requested to <<kafka-streams-StreamThread.adoc#maybeUpdateStandbyTasks, maybeUpdateStandbyTasks>>
200200
====
201+
202+
=== [[activeTaskCheckpointableOffsets]] Checkpointable Offsets -- `activeTaskCheckpointableOffsets` Method
203+
204+
[source, java]
205+
----
206+
Map<TopicPartition, Long> activeTaskCheckpointableOffsets()
207+
----
208+
209+
`activeTaskCheckpointableOffsets` simply returns an empty collection (of checkpointable offsets).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[TaskMetrics]] TaskMetrics
2+
3+
`TaskMetrics` is...FIXME

kafka-streams-StreamTask.adoc

+50-13
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ Used when `StreamTask` is requested for the following:
4141
4242
| consumedOffsets
4343
| [[consumedOffsets]] Offsets by https://kafka.apache.org/20/javadoc/org/apache/kafka/common/TopicPartition.html[TopicPartitions] (`Map<TopicPartition, Long>`) that `StreamTask` has <<process, processed>> successfully
44+
45+
| taskMetrics
46+
| [[taskMetrics]] <<kafka-streams-StreamTask-TaskMetrics.adoc#, TaskMetrics>>
4447
|===
4548

4649
[[logging]]
@@ -85,19 +88,6 @@ NOTE: `suspend` is part of <<kafka-streams-Task.adoc#suspend, Task Contract>> to
8588

8689
NOTE: The private `suspend` is used exclusively when `StreamTask` is requested to <<close, close>>.
8790

88-
=== [[commit]] `commit` Method
89-
90-
[source, java]
91-
----
92-
void commit() // <1>
93-
void commit(final boolean startNewTransaction)
94-
----
95-
<1> Calls `commit` with `startNewTransaction` flag on
96-
97-
NOTE: `commit` is part of <<kafka-streams-Task.adoc#commit, Task Contract>> to...FIXME.
98-
99-
`commit`...FIXME
100-
10191
=== [[close]] `close` Method
10292

10393
[source, java]
@@ -362,3 +352,50 @@ void commitOffsets(final boolean startNewTransaction)
362352
`commitOffsets`...FIXME
363353

364354
NOTE: `commitOffsets` is used exclusively when `StreamTask` is requested to <<commit, commit>>.
355+
356+
=== [[commit]] Committing Task -- `commit` Method
357+
358+
[source, java]
359+
----
360+
void commit()
361+
----
362+
363+
NOTE: `commit` is part of <<kafka-streams-Task.adoc#commit, Task Contract>> to commit the task.
364+
365+
`commit` simply <<commit-startNewTransaction, commits>> with the `startNewTransaction` flag on.
366+
367+
=== [[commit-startNewTransaction]] `commit` Internal Method
368+
369+
[source, java]
370+
----
371+
void commit(final boolean startNewTransaction)
372+
----
373+
374+
`commit` prints out the following DEBUG message to the logs:
375+
376+
```
377+
Committing
378+
```
379+
380+
`commit` <<flushState, flushState>>.
381+
382+
(only when <<kafka-streams-AbstractTask.adoc#eosEnabled, exactly-once support>> is disabled) `commit` requests the <<stateMgr, ProcessorStateManager>> to <<kafka-streams-ProcessorStateManager.adoc#checkpoint, checkpoint>> with the <<activeTaskCheckpointableOffsets, checkpointable offsets>>.
383+
384+
`commit` <<commitOffsets, commitOffsets>> with the input `startNewTransaction` flag.
385+
386+
`commit` turns the <<commitRequested, commitRequested>> internal flag off.
387+
388+
In the end, `commit` requests the <<taskMetrics, TaskMetrics>> for the <<taskCommitTimeSensor, taskCommitTimeSensor>> and records the duration (i.e. the time since `commit` was executed).
389+
390+
NOTE: `commit` is used when `StreamTask` is requested to <<commit, commit>> (that turns the input `startNewTransaction` flag on) and <<suspend, suspend>> (with the input `startNewTransaction` flag off).
391+
392+
=== [[activeTaskCheckpointableOffsets]] `activeTaskCheckpointableOffsets` Method
393+
394+
[source, java]
395+
----
396+
Map<TopicPartition, Long> activeTaskCheckpointableOffsets()
397+
----
398+
399+
NOTE: `activeTaskCheckpointableOffsets` is part of the <<kafka-streams-AbstractTask.adoc#activeTaskCheckpointableOffsets, AbstractTask Contract>> to return the checkpointable offsets.
400+
401+
`activeTaskCheckpointableOffsets`...FIXME

kafka-streams-StreamThread.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ Committing all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] s
605605

606606
Only if there are still running active and standby tasks, `maybeCommit` does the following:
607607

608-
. Requests the <<streamsMetrics, StreamsMetricsThreadImpl>> for <<kafka-streams-StreamsMetricsThreadImpl.adoc#commitTimeSensor, commitTimeSensor>> and records the commit time (as the latency of committing all the tasks by their number)
608+
. Requests the <<streamsMetrics, StreamsMetricsThreadImpl>> for the <<kafka-streams-StreamsMetricsThreadImpl.adoc#commitTimeSensor, commitTimeSensor>> and records the commit time (as the latency of committing all the tasks by their number)
609609

610610
. Requests the <<taskManager, TaskManager>> to <<kafka-streams-TaskManager.adoc#maybePurgeCommitedRecords, maybePurgeCommitedRecords>>
611611

kafka-streams-Task.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ a| [[commit]]
115115
void commit()
116116
----
117117

118+
Commits the task
119+
118120
Used when:
119121

120122
* `AssignedStreamsTasks` is requested to execute the <<kafka-streams-AssignedStreamsTasks.adoc#maybeCommitAction, maybeCommitAction>> action

0 commit comments

Comments
 (0)