Skip to content

Commit a3f135c

Browse files
committedMay 20, 2019
Punctuation (from StreamThread to PunctuationQueue)
1 parent f8a54d9 commit a3f135c

6 files changed

+66
-31
lines changed
 

‎kafka-streams-AssignedStreamsTasks.adoc

+19-3
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,32 @@ In the end, `maybeCommit` gives the number of running stream tasks that <<kafka-
8989

9090
NOTE: `maybeCommit` is used exclusively when `TaskManager` is requested to link:kafka-streams-internals-TaskManager.adoc#maybeCommitActiveTasks[maybeCommitActiveTasks].
9191

92-
=== [[punctuate]] `punctuate` Method
92+
=== [[punctuate]] Punctuating Running Stream Tasks (by Stream and System Time) -- `punctuate` Method
9393

9494
[source, java]
9595
----
9696
int punctuate()
9797
----
9898

99-
`punctuate`...FIXME
99+
`punctuate` walks over the <<kafka-streams-internals-AssignedTasks.adoc#running, running stream tasks>> and requests them to <<kafka-streams-internals-StreamTask.adoc#maybePunctuateStreamTime, maybePunctuateStreamTime>> and <<kafka-streams-internals-StreamTask.adoc#maybePunctuateSystemTime, maybePunctuateSystemTime>>.
100100

101-
NOTE: `punctuate` is used exclusively when `TaskManager` is requested to link:kafka-streams-internals-TaskManager.adoc#punctuate[punctuate].
101+
For every positive response (<<kafka-streams-internals-StreamTask.adoc#maybePunctuateStreamTime, maybePunctuateStreamTime>> or <<kafka-streams-internals-StreamTask.adoc#maybePunctuateSystemTime, maybePunctuateSystemTime>> returned `true`), an internal `punctuated` counter is incremented.
102+
103+
In the end, `punctuate` returns the internal `punctuated` counter.
104+
105+
In case of a `TaskMigratedException`, `punctuate` prints out the following INFO message to the logs, <<kafka-streams-internals-AssignedTasks.adoc#closeZombieTask, closes the zombie task>>, and possibly removes the task from the <<kafka-streams-internals-AssignedTasks.adoc#running, running stream tasks>>.
106+
107+
```
108+
Failed to punctuate stream task [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
109+
```
110+
111+
In case of a `KafkaException`, `punctuate` prints out the following ERROR message to the logs and re-throws the exception.
112+
113+
```
114+
Failed to punctuate stream task [taskId] due to the following error:
115+
```
116+
117+
NOTE: `punctuate` is used exclusively when `TaskManager` is requested to <<kafka-streams-internals-TaskManager.adoc#punctuate, punctuate stream tasks>>.
102118

103119
=== [[updateRestored]] `updateRestored` Method
104120

‎kafka-streams-PunctuationQueue.adoc

+12-7
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
== [[PunctuationQueue]] PunctuationQueue
22

3-
`PunctuationQueue` is...FIXME
3+
[[pq]]
4+
`PunctuationQueue` manages a https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/PriorityQueue.html[java.util.PriorityQueue] of <<kafka-streams-PunctuationSchedule.adoc#, PunctuationSchedules>>.
45

5-
=== [[mayPunctuate]] `mayPunctuate` Method
6+
=== [[mayPunctuate]] Attempting to Punctuate -- `mayPunctuate` Method
67

78
[source, java]
89
----
910
boolean mayPunctuate(
10-
final long timestamp,
11-
final PunctuationType type,
12-
final ProcessorNodePunctuator processorNodePunctuator)
11+
long timestamp,
12+
PunctuationType type,
13+
ProcessorNodePunctuator processorNodePunctuator)
1314
----
1415

15-
`mayPunctuate`...FIXME
16+
`mayPunctuate` takes the <<kafka-streams-PunctuationSchedule.adoc#, PunctuationSchedules>> off the <<pq, PriorityQueue>> for which the <<kafka-streams-Stamped.adoc#timestamp, timestamp>> is older (_smaller_) than the given `timestamp`.
1617

17-
NOTE: `mayPunctuate` is used when `StreamTask` is requested to link:kafka-streams-internals-StreamTask.adoc#maybePunctuateStreamTime[maybePunctuateStreamTime] and link:kafka-streams-internals-StreamTask.adoc#maybePunctuateSystemTime[maybePunctuateSystemTime].
18+
`mayPunctuate` then requests the given <<kafka-streams-ProcessorNodePunctuator.adoc#, ProcessorNodePunctuator>> to <<kafka-streams-ProcessorNodePunctuator.adoc#punctuate, punctuate>> (with the <<kafka-streams-PunctuationSchedule.adoc#node, node>> and the <<kafka-streams-PunctuationSchedule.adoc#punctuator, punctuator>> of every `PunctuationSchedule`, the given `timestamp` and the `PunctuationType`).
19+
20+
In the end, `mayPunctuate` returns whether a `PunctuationSchedule` was punctuated (`true`) or not (`false`).
21+
22+
NOTE: `mayPunctuate` is used when `StreamTask` is requested to attempt to punctuate by <<kafka-streams-internals-StreamTask.adoc#maybePunctuateStreamTime, stream>> and <<kafka-streams-internals-StreamTask.adoc#maybePunctuateSystemTime, system>> time.

‎kafka-streams-internals-PartitionGroup.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ int numBuffered(final TopicPartition partition)
6767

6868
NOTE: `numBuffered` is used when...FIXME
6969

70-
=== [[timestamp]] Minimum Partition Timestamp Across All TopicPartitions -- `timestamp` Method
70+
=== [[timestamp]] Minimum Partition Timestamp Across All Partitions -- `timestamp` Method
7171

7272
[source, java]
7373
----

‎kafka-streams-internals-StreamTask.adoc

+11-5
Original file line numberDiff line numberDiff line change
@@ -290,18 +290,24 @@ NOTE: `punctuate` is part of link:kafka-streams-ProcessorNodePunctuator.adoc#pun
290290

291291
`punctuate`...FIXME
292292

293-
=== [[maybePunctuateStreamTime]] `maybePunctuateStreamTime` Method
293+
=== [[maybePunctuateStreamTime]] Attempting to Punctuate by Stream Time -- `maybePunctuateStreamTime` Method
294294

295295
[source, java]
296296
----
297297
boolean maybePunctuateStreamTime()
298298
----
299299

300-
`maybePunctuateStreamTime`...FIXME
300+
`maybePunctuateStreamTime` requests the <<partitionGroup, PartitionGroup>> for the <<kafka-streams-internals-PartitionGroup.adoc#timestamp, minimum partition timestamp across all partitions>>.
301301

302-
NOTE: `maybePunctuateStreamTime` is used exclusively when `AssignedStreamsTasks` is requested to link:kafka-streams-AssignedStreamsTasks.adoc#punctuate[punctuate].
302+
`maybePunctuateStreamTime` requests the <<streamTimePunctuationQueue, stream-time PunctuationQueue>> to <<kafka-streams-PunctuationQueue.adoc#mayPunctuate, mayPunctuate>> with the minimum timestamp.
303303

304-
=== [[maybePunctuateSystemTime]] `maybePunctuateSystemTime` Method
304+
In the end, `maybePunctuateStreamTime` returns whatever the stream-time `PunctuationQueue` returned.
305+
306+
If the minimum timestamp is <<kafka-streams-internals-RecordQueue.adoc#UNKNOWN, UNKNOWN>>, `maybePunctuateStreamTime` returns `false`.
307+
308+
NOTE: `maybePunctuateStreamTime` is used exclusively when `AssignedStreamsTasks` is requested to <<kafka-streams-AssignedStreamsTasks.adoc#punctuate, punctuate running stream tasks>>.
309+
310+
=== [[maybePunctuateSystemTime]] Attempting to Punctuate by System Time -- `maybePunctuateSystemTime` Method
305311

306312
[source, java]
307313
----
@@ -310,7 +316,7 @@ boolean maybePunctuateSystemTime()
310316

311317
`maybePunctuateSystemTime`...FIXME
312318

313-
NOTE: `maybePunctuateSystemTime` is used exclusively when `AssignedStreamsTasks` is requested to link:kafka-streams-AssignedStreamsTasks.adoc#punctuate[punctuate].
319+
NOTE: `maybePunctuateSystemTime` is used exclusively when `AssignedStreamsTasks` is requested to <<kafka-streams-AssignedStreamsTasks.adoc#punctuate, punctuate running stream tasks>>.
314320

315321
=== [[schedule]] `schedule` Method
316322

‎kafka-streams-internals-StreamThread.adoc

+20-12
Original file line numberDiff line numberDiff line change
@@ -520,17 +520,6 @@ In the end, `create` creates a <<creating-instance, StreamThread>>.
520520

521521
NOTE: `create` is used exclusively when `KafkaStreams` is <<kafka-streams-KafkaStreams.adoc#, created>>.
522522

523-
=== [[punctuate]] `punctuate` Internal Method
524-
525-
[source, java]
526-
----
527-
void punctuate()
528-
----
529-
530-
`punctuate`...FIXME
531-
532-
NOTE: `punctuate` is used when `StreamThread` is requested to <<run, start>> (and <<runOnce, polls records once>> and <<processAndMaybeCommit, processes records (with optional commit)>>).
533-
534523
=== [[enforceRebalance]] Enforcing Rebalance -- `enforceRebalance` Internal Method
535524

536525
[source, java]
@@ -577,6 +566,21 @@ Committed all active tasks [activeTaskIds] and standby tasks [standbyTaskIds] in
577566

578567
NOTE: `maybeCommit` is used when `StreamThread` is requested to <<runOnce, poll records once>> (directly and indirectly in <<processAndMaybeCommit, processAndMaybeCommit>>).
579568

569+
=== [[maybePunctuate]] Attempting to Punctuate (Running Stream Tasks) -- `maybePunctuate` Internal Method
570+
571+
[source, java]
572+
----
573+
boolean maybePunctuate()
574+
----
575+
576+
`maybePunctuate` requests the <<taskManager, TaskManager>> to <<kafka-streams-internals-TaskManager.adoc#punctuate, punctuate>>.
577+
578+
If the punctuate returned a positive number (greater than `0`), `maybePunctuate` <<advanceNowAndComputeLatency, advanceNowAndComputeLatency>> and requests the <<streamsMetrics, StreamsMetricsThreadImpl>> for the <<kafka-streams-StreamsMetricsThreadImpl.adoc#punctuateTimeSensor, punctuateTimeSensor>> to record the punctuate time.
579+
580+
In the end, `maybePunctuate` returns whether the punctuate returned a positive number (`true`) or not (`false`).
581+
582+
NOTE: `maybePunctuate` is used exclusively when `StreamThread` is requested to <<runOnce, poll records once and add them to active stream tasks>>.
583+
580584
=== [[internal-properties]] Internal Properties
581585

582586
[cols="30m,70",options="header",width="100%"]
@@ -591,10 +595,14 @@ a| [[builder]] <<kafka-streams-internals-InternalTopologyBuilder.adoc#, Internal
591595
a| [[lastCommitMs]] Time of the last <<maybeCommit, commit>>
592596

593597
| numIterations
594-
a| [[numIterations]]
598+
a| [[numIterations]] Number of iterations when the <<taskManager, TaskManager>> is requested to <<kafka-streams-internals-TaskManager.adoc#process, process records by running stream tasks (one record per task)>> while `StreamThread` is <<runOnce, polling records once and adding them to active stream tasks>>
595599

596600
Default: `1`
597601

602+
Incremented while <<runOnce, polling records once and adding them to active stream tasks>>
603+
604+
Decremented by half while <<runOnce, polling records once and adding them to active stream tasks>>
605+
598606
| processStandbyRecords
599607
a| [[processStandbyRecords]] Flag to control whether to <<maybeUpdateStandbyTasks, maybeUpdateStandbyTasks>> after <<maybeCommit, maybeCommit>> was executed
600608

‎kafka-streams-internals-TaskManager.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -358,16 +358,16 @@ In the end, `maybeCommitActiveTasks` gives the number of running stream tasks th
358358

359359
NOTE: `maybeCommitActiveTasks` is used exclusively when `StreamThread` is requested to link:kafka-streams-internals-StreamThread.adoc#processAndMaybeCommit[processAndMaybeCommit].
360360

361-
=== [[punctuate]] `punctuate` Method
361+
=== [[punctuate]] Punctuating Stream Tasks -- `punctuate` Method
362362

363363
[source, java]
364364
----
365365
int punctuate()
366366
----
367367

368-
`punctuate` simply requests <<active, AssignedStreamsTasks>> to link:kafka-streams-AssignedStreamsTasks.adoc#punctuate[punctuate].
368+
`punctuate` simply requests the <<active, AssignedStreamsTasks>> to <<kafka-streams-AssignedStreamsTasks.adoc#punctuate, punctuate>>.
369369

370-
NOTE: `punctuate` is used exclusively when `StreamThread` is requested to link:kafka-streams-internals-StreamThread.adoc#punctuate[punctuate].
370+
NOTE: `punctuate` is used exclusively when `StreamThread` is requested to <<kafka-streams-internals-StreamThread.adoc#maybePunctuate, attempt to punctuate>>.
371371

372372
=== [[commitAll]] Committing Active and Standby Tasks -- `commitAll` Method
373373

0 commit comments

Comments
 (0)
Please sign in to comment.