Skip to content

Commit 1d99584

Browse files
StreamTask and the current record (PartitionGroup, ProcessorNode, and Processor)
1 parent 1e8b9c7 commit 1d99584

5 files changed

+50
-43
lines changed

kafka-streams-Processor.adoc

+4-4
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ a| [[init]]
3535
void init(ProcessorContext context)
3636
----
3737

38-
Initializes the processor (with a <<kafka-streams-ProcessorContext.adoc#, ProcessorContext>>)
38+
Initializes the processor with a <<kafka-streams-ProcessorContext.adoc#, ProcessorContext>> (that can be used to decide whether a record should be <<kafka-streams-ProcessorContext.adoc#forward, forwarded downstream>> to child processors if there are any)
3939

40-
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#init, init>>.
40+
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#init, init>>
4141

4242
| process
4343
a| [[process]]
@@ -47,9 +47,9 @@ a| [[process]]
4747
void process(K key, V value)
4848
----
4949

50-
Processes a single record
50+
Processes a single record (as a pair of a `K` key and a `V` value)
5151

52-
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#process, process>>.
52+
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#process, process a single record (as a key and value pair)>>.
5353

5454
|===
5555

kafka-streams-ProcessorContext.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void forward(final K key, final V value)
6060
void forward(final K key, final V value, final To to)
6161
----
6262

63-
Forwards a record to downstream processors
63+
Forwards a record downstream (to child processors if there are any)
6464

6565
| getStateStore
6666
a| [[getStateStore]]

kafka-streams-internals-PartitionGroup.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
== [[PartitionGroup]] PartitionGroup
22

3-
`PartitionGroup` manages <<partitionQueues, RecordQueues per partition>> assigned to a <<kafka-streams-internals-StreamTask.adoc#, StreamTask>>.
3+
`PartitionGroup` is a collection of <<partitionQueues, RecordQueues>> (one per every <<kafka-streams-internals-StreamTask.adoc#partitions, partition>> assigned to a <<kafka-streams-internals-StreamTask.adoc#, StreamTask>>).
44

55
`PartitionGroup` is <<creating-instance, created>> exclusively for a <<kafka-streams-internals-StreamTask.adoc#partitionGroup, StreamTask>>.
66

kafka-streams-internals-ProcessorNode.adoc

+32-30
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
== [[ProcessorNode]] ProcessorNode
22

3-
`ProcessorNode` is a <<process, processing node>> in a topology that is identified by a <<name, name>> and have zero or more <<children, child processor nodes>>.
3+
`ProcessorNode` is a <<process, processing node>> in a processor topology that is identified by a <<name, name>> and have zero or more <<children, child processor nodes>>.
44

55
NOTE: link:kafka-streams-internals-SourceNode.adoc[SourceNode] and link:kafka-streams-internals-SinkNode.adoc[SinkNode] are specialized `ProcessorNodes`.
66

@@ -12,29 +12,6 @@ NOTE: link:kafka-streams-internals-SourceNode.adoc[SourceNode] and link:kafka-st
1212

1313
`ProcessorNode` has a <<toString, human-friendly / textual representation>> that is particularly helpful for debugging.
1414

15-
[[internal-registries]]
16-
.ProcessorNode's Internal Properties (e.g. Registries, Counters and Flags)
17-
[cols="1,2",options="header",width="100%"]
18-
|===
19-
| Name
20-
| Description
21-
22-
| `children`
23-
| [[children]] Child `ProcessorNodes`
24-
25-
Used when...FIXME
26-
27-
| `nodeMetrics`
28-
| [[nodeMetrics]] link:kafka-streams-NodeMetrics.adoc[NodeMetrics]
29-
30-
Used when...FIXME
31-
32-
| `time`
33-
| [[time]] `Time`
34-
35-
Used when...FIXME
36-
|===
37-
3815
=== [[init]] `init` Method
3916

4017
[source, java]
@@ -53,14 +30,16 @@ void init(ProcessorContext context)
5330
* `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#initTopology, initTopology>>
5431
====
5532

56-
=== [[process]] Processing Record -- `process` Method
33+
=== [[process]] Processing Single Record (As Key and Value Pair) -- `process` Method
5734

5835
[source, java]
5936
----
6037
void process(final K key, final V value)
6138
----
6239

63-
`process`...FIXME
40+
`process` requests the <<processor, Processor>> to <<kafka-streams-Processor.adoc#process, process the given key and value pair>>.
41+
42+
In the end, `process` requests the <<nodeMetrics, NodeMetrics>> for the <<kafka-streams-NodeMetrics.adoc#nodeProcessTimeSensor, nodeProcessTimeSensor>> to record the processing time.
6443

6544
[NOTE]
6645
====
@@ -70,18 +49,18 @@ void process(final K key, final V value)
7049
7150
* `GlobalStateUpdateTask` is requested to <<kafka-streams-internals-GlobalStateUpdateTask.adoc#update, update>>
7251
73-
* `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#process, process>>
52+
* `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#process, process a single record>>
7453
====
7554

7655
=== [[creating-instance]] Creating ProcessorNode Instance
7756

78-
`ProcessorNode` takes the following when created:
57+
`ProcessorNode` takes the following to be created:
7958

8059
* [[name]] Name
81-
* [[processor]] link:kafka-streams-Processor.adoc[Processor] (of `K` keys and `V` values)
60+
* [[processor]] <<kafka-streams-Processor.adoc#, Processor>> (`Processor<K, V>` of `K` keys and `V` values)
8261
* [[stateStores]] Names of the associated state stores
8362

84-
`ProcessorNode` initializes the <<internal-registries, internal registries and counters>>.
63+
`ProcessorNode` initializes the <<internal-properties, internal properties>>.
8564

8665
=== [[addChild]] Adding Child Processor Node -- `addChild` Method
8766

@@ -130,3 +109,26 @@ void punctuate(final long timestamp, final Punctuator punctuator)
130109
`punctuate` requests the <<nodeMetrics, NodeMetrics>> for the link:kafka-streams-NodeMetrics.adoc#metrics[StreamsMetricsImpl] that is requested to link:kafka-streams-internals-StreamsMetricsImpl.adoc#measureLatencyNs[measureLatencyNs] (that records it with the link:kafka-streams-NodeMetrics.adoc#nodePunctuateTimeSensor[nodePunctuateTimeSensor] of the <<nodeMetrics, NodeMetrics>>).
131110

132111
NOTE: `punctuate` is used exclusively when `StreamTask` is requested to link:kafka-streams-internals-StreamTask.adoc#punctuate[execute a scheduled periodic action].
112+
113+
=== [[internal-properties]] Internal Properties
114+
115+
[cols="30m,70",options="header",width="100%"]
116+
|===
117+
| Name
118+
| Description
119+
120+
| children
121+
| [[children]] Child `ProcessorNodes`
122+
123+
Used when...FIXME
124+
125+
| nodeMetrics
126+
| [[nodeMetrics]] <<kafka-streams-NodeMetrics.adoc#, NodeMetrics>>
127+
128+
Used when...FIXME
129+
130+
| time
131+
| [[time]] `Time`
132+
133+
Used when...FIXME
134+
|===

kafka-streams-internals-StreamTask.adoc

+12-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
== [[StreamTask]] StreamTask
22

3-
`StreamTask` is a concrete <<kafka-streams-internals-AbstractTask.adoc#, stream processor task>> that uses a <<partitionGroup, PartitionGroup>> (with the <<partitions, partitions assigned>>) to <<process, process records>> (one at a time) ordered by partition timestamp.
3+
`StreamTask` is a concrete <<kafka-streams-internals-AbstractTask.adoc#, stream processor task>> that uses a <<partitionGroup, PartitionGroup>> (with the <<partitions, partitions assigned>>) to determine which record should be <<process, processed>> (as ordered by partition timestamp).
4+
5+
When requested to <<process, process a single record>>, `StreamTask` requests the <<partitionGroup, PartitionGroup>> for the <<kafka-streams-internals-PartitionGroup.adoc#nextRecord, next stamped record (record with timestamp) and the RecordQueue>>. `StreamTask` uses a <<recordInfo, RecordInfo>> to hold the <<kafka-streams-internals-RecordQueue.adoc#, RecordQueue>> (with the source <<kafka-streams-internals-ProcessorNode.adoc#, processor node>> and the partition) of the currently-processed stamped record. Eventually, `StreamTask` requests the source processor node (of the `RecordQueue` and the partition) to <<kafka-streams-internals-ProcessorNode.adoc#process, process the record>>.
6+
7+
NOTE: It is at the discretion of a processor node (incl. a source processor node) to <<kafka-streams-ProcessorContext.adoc#forward, forward the record downstream>> (to child processors if there are any).
48

59
`StreamTask` is <<creating-instance, created>> exclusively when `TaskCreator` is requested to <<kafka-streams-internals-TaskCreator.adoc#createTask, create one>>.
610

@@ -100,7 +104,7 @@ NOTE: `close` is part of link:kafka-streams-internals-Task.adoc#close[Task Contr
100104
`StreamTask` takes the following to be created:
101105

102106
* [[id]] <<kafka-streams-TaskId.adoc#, TaskId>>
103-
* [[partitions]] Topic partitions (Kafka `TopicPartition`)
107+
* [[partitions]] Partitions (Kafka https://kafka.apache.org/22/javadoc/org/apache/kafka/common/TopicPartition.html[TopicPartition])
104108
* [[topology]] <<kafka-streams-internals-ProcessorTopology.adoc#, ProcessorTopology>>
105109
* [[consumer]] Kafka https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html[Consumer] (`Consumer<byte[], byte[]>`)
106110
* [[changelogReader]] <<kafka-streams-ChangelogReader.adoc#, ChangelogReader>>
@@ -493,7 +497,6 @@ NOTE: `producerMetrics` is used when...FIXME
493497

494498
=== [[internal-properties]] Internal Properties
495499

496-
.StreamTask's Internal Properties (e.g. Registries, Counters and Flags)
497500
[cols="30m,70",options="header",width="100%"]
498501
|===
499502
| Name
@@ -506,10 +509,12 @@ NOTE: `producerMetrics` is used when...FIXME
506509
a| [[idleStartTime]]
507510

508511
| partitionGroup
509-
a| [[partitionGroup]] <<kafka-streams-internals-PartitionGroup.adoc#, PartitionGroup>> (with <<kafka-streams-internals-RecordQueue.adoc#, RecordQueues>> per <<partitions, partition assigned>>)
512+
a| [[partitionGroup]] <<kafka-streams-internals-PartitionGroup.adoc#, PartitionGroup>> (with <<kafka-streams-internals-RecordQueue.adoc#, RecordQueues>> per every <<partitions, partition assigned>>)
510513

511514
Used when `StreamTask` is requested for the following:
512515

516+
* <<addRecords, Buffer new records (from a partition)>>
517+
513518
* <<isProcessable, isProcessable>>
514519

515520
* <<process, Process a single record>>
@@ -518,8 +523,6 @@ Used when `StreamTask` is requested for the following:
518523

519524
* <<closeSuspended, closeSuspended>>
520525

521-
* <<addRecords, Buffer new records (from a partition)>>
522-
523526
* <<numBuffered, numBuffered>>
524527

525528
* <<maybePunctuateStreamTime, maybePunctuateStreamTime>>
@@ -543,7 +546,9 @@ Used for the following:
543546
* <<producerMetrics, producerMetrics>>
544547

545548
| recordInfo
546-
a| [[recordInfo]] <<kafka-streams-internals-RecordInfo.adoc#, RecordInfo>>
549+
a| [[recordInfo]] <<kafka-streams-internals-RecordInfo.adoc#, RecordInfo>> (that holds a <<kafka-streams-internals-RecordQueue.adoc#, RecordQueue>> with the source <<kafka-streams-internals-ProcessorNode.adoc#, processor node>> and the partition the <<process, currently-processed stamped record>> came from)
550+
551+
Created empty alongside the <<StreamTask, StreamTask>> and _"fill up"_ with the <<kafka-streams-internals-RecordQueue.adoc#, RecordQueue>> when requested to <<process, process a single record>>
547552

548553
| streamTimePunctuationQueue
549554
a| [[streamTimePunctuationQueue]] <<kafka-streams-PunctuationQueue.adoc#, PunctuationQueue>>

0 commit comments

Comments
 (0)