Skip to content

Commit 7ebf70b

Browse files
Processor Contract -- Stream Processing Node
1 parent f99b1fe commit 7ebf70b

13 files changed

+117
-49
lines changed

SUMMARY.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676

7777
. link:kafka-streams-processor-api.adoc[Low-Level Processor API]
7878

79-
. link:kafka-streams-Processor.adoc[Processor Contract -- Stream Processing Nodes]
79+
. link:kafka-streams-Processor.adoc[Processor Contract -- Stream Processing Node]
8080
.. link:kafka-streams-AbstractProcessor.adoc[AbstractProcessor -- Base for Stream Processors]
8181

8282
. link:kafka-streams-ProcessorContext.adoc[ProcessorContext]
@@ -300,7 +300,7 @@
300300
.. link:kafka-streams-internals-StandbyTask.adoc[StandbyTask]
301301
.. link:kafka-streams-internals-StreamTask.adoc[StreamTask]
302302

303-
. link:kafka-streams-AssignedTasks.adoc[AssignedTasks]
303+
. link:kafka-streams-internals-AssignedTasks.adoc[AssignedTasks]
304304
.. link:kafka-streams-AssignedStandbyTasks.adoc[AssignedStandbyTasks -- AssignedTasks For StandbyTasks]
305305
.. link:kafka-streams-AssignedStreamsTasks.adoc[AssignedStreamsTasks -- AssignedTasks For StreamTasks]
306306

kafka-streams-AssignedStandbyTasks.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
== [[AssignedStandbyTasks]] AssignedStandbyTasks -- AssignedTasks For StandbyTasks
22

3-
`AssignedStandbyTasks` is a link:kafka-streams-AssignedTasks.adoc[AssignedTasks] for link:kafka-streams-internals-StandbyTask.adoc[StandbyTasks] that...FIXME
3+
`AssignedStandbyTasks` is a link:kafka-streams-internals-AssignedTasks.adoc[AssignedTasks] for link:kafka-streams-internals-StandbyTask.adoc[StandbyTasks] that...FIXME
44

55
`AssignedStandbyTasks` is <<creating-instance, created>> along with a link:kafka-streams-StreamThread.adoc#create[StreamThread] (when `KafkaStreams` is link:kafka-streams-KafkaStreams.adoc#creating-instance[created]).
66

77
[[logContext]]
88
[[creating-instance]]
99
`AssignedStandbyTasks` takes a `LogContext` when created.
1010

11-
`AssignedStandbyTasks` uses *standby task* for link:kafka-streams-AssignedTasks.adoc#taskTypeName[taskTypeName].
11+
`AssignedStandbyTasks` uses *standby task* for link:kafka-streams-internals-AssignedTasks.adoc#taskTypeName[taskTypeName].
1212

1313
[[logging]]
1414
[TIP]

kafka-streams-AssignedStreamsTasks.adoc

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
== [[AssignedStreamsTasks]] AssignedStreamsTasks -- AssignedTasks For StreamTasks
22

3-
`AssignedStreamsTasks` is a concrete <<kafka-streams-AssignedTasks.adoc#, AssignedTasks>> of <<kafka-streams-internals-StreamTask.adoc#, StreamTasks>> that...FIXME
3+
`AssignedStreamsTasks` is a concrete <<kafka-streams-internals-AssignedTasks.adoc#, AssignedTasks>> of <<kafka-streams-internals-StreamTask.adoc#, StreamTasks>> that...FIXME
44

55
`AssignedStreamsTasks` is <<creating-instance, created>> for a <<kafka-streams-StreamThread.adoc#create, StreamThread>> (when `KafkaStreams` is <<kafka-streams-KafkaStreams.adoc#creating-instance, created>>).
66

77
`AssignedStreamsTasks` is a `RestoringTasks` that...FIXME
88

9-
It _appears_ that `AssignedStreamsTasks` simply operates on the running tasks (i.e. the tasks that are in link:kafka-streams-AssignedTasks.adoc#running[running] internal registry). When requested to <<process, process>> or <<punctuate, punctuate>> `AssignedStreamsTasks` simply walks over the `running` internal registry and triggers execution of every task.
9+
It _appears_ that `AssignedStreamsTasks` simply operates on the running tasks (i.e. the tasks that are in link:kafka-streams-internals-AssignedTasks.adoc#running[running] internal registry). When requested to <<process, process>> or <<punctuate, punctuate>> `AssignedStreamsTasks` simply walks over the `running` internal registry and triggers execution of every task.
1010

1111
[[maybeCommitAction]]
1212
`AssignedStreamsTasks` uses the *maybeCommit* task action (`TaskAction<StreamTask>`) that is used in <<maybeCommit, maybeCommit>>. The task action takes a <<kafka-streams-internals-StreamTask.adoc#, stream task>> and checks if the task link:kafka-streams-internals-StreamTask.adoc#commitNeeded[needs a commit]. If so, the action does the following:
@@ -25,7 +25,7 @@ Committed active task [id] per user request in
2525
[[creating-instance]]
2626
`AssignedStreamsTasks` takes a `LogContext` when created.
2727

28-
`AssignedStreamsTasks` uses *stream task* for link:kafka-streams-AssignedTasks.adoc#taskTypeName[taskTypeName].
28+
`AssignedStreamsTasks` uses *stream task* for link:kafka-streams-internals-AssignedTasks.adoc#taskTypeName[taskTypeName].
2929

3030
[[internal-registries]]
3131
.AssignedStreamsTasks's Internal Properties (e.g. Registries, Counters and Flags)
@@ -62,7 +62,7 @@ Refer to link:kafka-logging.adoc#log4j.properties[Application Logging Using log4
6262
int process()
6363
----
6464

65-
`process` requests every link:kafka-streams-AssignedTasks.adoc#running[running] stream task to link:kafka-streams-internals-StreamTask.adoc#process[process a single record].
65+
`process` requests every link:kafka-streams-internals-AssignedTasks.adoc#running[running] stream task to link:kafka-streams-internals-StreamTask.adoc#process[process a single record].
6666

6767
In the end, `process` gives the number of stream tasks that processed a single record successfully.
6868

@@ -76,9 +76,9 @@ In case of a `TaskMigratedException`, `process` prints out the following INFO me
7676
Failed to process stream task [id] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
7777
```
7878

79-
`process` then link:kafka-streams-AssignedTasks.adoc#closeZombieTask[closes the task] (considering the task a zombie). If this reports a `RuntimeException`, `process` re-throws it.
79+
`process` then link:kafka-streams-internals-AssignedTasks.adoc#closeZombieTask[closes the task] (considering the task a zombie). If this reports a `RuntimeException`, `process` re-throws it.
8080

81-
`process` removes the task from link:kafka-streams-AssignedTasks.adoc#running[running] and throws the `TaskMigratedException`.
81+
`process` removes the task from link:kafka-streams-internals-AssignedTasks.adoc#running[running] and throws the `TaskMigratedException`.
8282

8383
==== [[process-RuntimeException]] process and RuntimeException
8484

@@ -97,7 +97,7 @@ Failed to process stream task [id] due to the following error:
9797
int maybeCommit()
9898
----
9999

100-
`maybeCommit` resets the <<committed, committed>> internal counter (to `0`) and <<kafka-streams-AssignedTasks.adoc#applyToRunningTasks, executes>> the <<maybeCommitAction, maybeCommitAction>> task action to every <<kafka-streams-AssignedTasks.adoc#running, running task>> (that modifies <<committed, committed>>).
100+
`maybeCommit` resets the <<committed, committed>> internal counter (to `0`) and <<kafka-streams-internals-AssignedTasks.adoc#applyToRunningTasks, executes>> the <<maybeCommitAction, maybeCommitAction>> task action to every <<kafka-streams-internals-AssignedTasks.adoc#running, running task>> (that modifies <<committed, committed>>).
101101

102102
In the end, `maybeCommit` gives the number of running stream tasks that <<kafka-streams-internals-StreamTask.adoc#commitNeeded, needed a commit>>.
103103

kafka-streams-Processor.adoc

+15-6
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
== [[Processor]] Processor Contract -- Stream Processing Nodes
1+
== [[Processor]] Processor Contract -- Stream Processing Node
22

3-
`Processor` is the <<contract, contract>> for *record stream processors* (aka _stream processing nodes_) that <<process, process records>> (one at a time).
3+
`Processor` is the main <<contract, abstraction>> of the <<kafka-streams-processor-api.adoc#, Low-Level Processor API>> for *record stream processors* (aka _stream processing nodes_) that can <<process, process one record at a time>>.
44

5-
`Processor` is an integral part of link:kafka-streams-internals-ProcessorNode.adoc#processor[ProcessorNode].
5+
`Processor` can be added to a <<kafka-streams-Topology.adoc#, Topology>> using <<kafka-streams-Topology.adoc#addProcessor, Topology.addProcessor>> operator (via <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>>).
6+
7+
NOTE: <<kafka-streams-streams-dsl.adoc#, Streams DSL -- High-Level Stream Processing DSL>> comes with the <<kafka-streams-KStream.adoc#process, KStream.process>> operator to add a custom `Processor` to a topology (via <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>>).
8+
9+
The lifecycle of a `Processor` is fully controlled by a corresponding <<kafka-streams-internals-ProcessorNode.adoc#processor, ProcessorNode>>.
610

711
[[contract]]
812
.Processor Contract
@@ -21,6 +25,8 @@ void close()
2125

2226
Closes the processor
2327

28+
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#close, close>>.
29+
2430
| init
2531
a| [[init]]
2632

@@ -31,6 +37,8 @@ void init(ProcessorContext context)
3137

3238
Initializes the processor (with a <<kafka-streams-ProcessorContext.adoc#, ProcessorContext>>)
3339

40+
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#init, init>>.
41+
3442
| process
3543
a| [[process]]
3644

@@ -40,9 +48,10 @@ void process(K key, V value)
4048
----
4149

4250
Processes a single record
43-
|===
4451

45-
The lifecycle of a `Processor` starts at <<init, init>> when....FIXME
52+
Used exclusively when `ProcessorNode` is requested to <<kafka-streams-internals-ProcessorNode.adoc#process, process>>.
53+
54+
|===
4655

4756
TIP: Use <<AbstractProcessor, AbstractProcessor>> when you want to develop a custom `Processor`.
4857

@@ -54,7 +63,7 @@ TIP: Use <<AbstractProcessor, AbstractProcessor>> when you want to develop a cus
5463
| Description
5564

5665
| <<kafka-streams-AbstractProcessor.adoc#, AbstractProcessor>>
57-
| [[AbstractProcessor]] Abstract Processor that manages the `ProcessorContext` instance and provides a no-op <<close, close>> implementation
66+
| [[AbstractProcessor]] Abstract processor that manages a <<kafka-streams-ProcessorContext.adoc#, ProcessorContext>> instance and provides a no-op <<close, close>> implementation
5867

5968
| <<kafka-streams-internals-KStreamTransformValuesProcessor.adoc#, KStreamTransformValuesProcessor>>
6069
| [[KStreamTransformValuesProcessor]]

kafka-streams-ProcessorContextImpl.adoc

+19
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,22 @@ NOTE: `schedule` is part of link:kafka-streams-ProcessorContext.adoc#schedule[Pr
3939
* [[cache]] <<kafka-streams-ThreadCache.adoc#, ThreadCache>>
4040

4141
`ProcessorContextImpl` initializes the <<internal-registries, internal registries and counters>>.
42+
43+
=== [[forward]] `forward` Method
44+
45+
[source, scala]
46+
----
47+
void forward(final K key, final V value)
48+
void forward(final K key, final V value, final int childIndex)
49+
void forward(final K key, final V value, final String childName)
50+
void forward(final K key, final V value, final To to)
51+
void forward(
52+
final ProcessorNode child,
53+
final K key,
54+
final V value) // <1>
55+
----
56+
<1> Private API
57+
58+
NOTE: `forward` is part of the <<kafka-streams-ProcessorContext.adoc#forward, ProcessorContext Contract>> to...FIXME.
59+
60+
`forward`...FIXME

kafka-streams-AssignedTasks.adoc kafka-streams-internals-AssignedTasks.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ RuntimeException suspendTasks(final Collection<T> tasks)
118118

119119
`suspendTasks`...FIXME
120120

121-
NOTE: `suspendTasks` is used when...FIXME
121+
NOTE: `suspendTasks` is used exclusively when `AssignedTasks` is requested to <<suspend, suspend all active tasks>>.
122122

123123
=== [[hasRunningTasks]] Checking If There Is At Least One Running Task -- `hasRunningTasks` Method
124124

@@ -385,7 +385,7 @@ Close created [taskTypeName] [created]
385385

386386
In the end, `suspend` removes all entries from <<running, running>>, <<restoring, restoring>>, <<created, created>>, <<runningByPartition, runningByPartition>> and <<restoringByPartition, restoringByPartition>>.
387387

388-
NOTE: `suspend` is used exclusively when `TaskManager` is requested to link:kafka-streams-internals-TaskManager.adoc#suspendTasksAndState[suspend all active and standby stream tasks and state].
388+
NOTE: `suspend` is used exclusively when `TaskManager` is requested to <<kafka-streams-internals-TaskManager.adoc#suspendTasksAndState, suspend all active and standby stream tasks and state>>.
389389

390390
=== [[closeNonRunningTasks]] `closeNonRunningTasks` Internal Method
391391

kafka-streams-internals-GlobalProcessorContextImpl.adoc

+11
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,14 @@ StateStore getStateStore(final String name)
1212
NOTE: `getStateStore` is part of the <<kafka-streams-ProcessorContext.adoc#getStateStore, ProcessorContext Contract>> to...FIXME.
1313

1414
`getStateStore` simply requests the <<kafka-streams-AbstractProcessorContext.adoc#stateManager, StateManager>> to <<kafka-streams-internals-StateManager.adoc#getGlobalStore, get the global state store>> by the given name.
15+
16+
=== [[forward]] `forward` Method
17+
18+
[source, java]
19+
----
20+
void forward(final K key, final V value)
21+
----
22+
23+
NOTE: `forward` is part of the <<kafka-streams-ProcessorContext.adoc#forward, ProcessorContext Contract>> to...FIXME.
24+
25+
`forward`...FIXME

kafka-streams-internals-GlobalStateUpdateTask.adoc

+11
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,14 @@ NOTE: `close` is part of link:kafka-streams-internals-StateManager.adoc#close[St
8181
* [[logContext]] `LogContext`
8282

8383
`GlobalStateUpdateTask` initializes the <<offsets, offsets>> and <<deserializers, deserializers>> internal registries.
84+
85+
=== [[initTopology]] `initTopology` Internal Method
86+
87+
[source, java]
88+
----
89+
void initTopology()
90+
----
91+
92+
`initTopology`...FIXME
93+
94+
NOTE: `initTopology` is used exclusively when `GlobalStateUpdateTask` is requested to <<initialize, initialize>>.

kafka-streams-internals-ProcessorNode.adoc

+20-4
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,16 @@ void init(ProcessorContext context)
4444

4545
`init`...FIXME
4646

47-
NOTE: `init` is used when...FIXME
47+
[NOTE]
48+
====
49+
`init` is used when:
4850
49-
=== [[process]] `process` Method
51+
* `GlobalStateUpdateTask` is requested to <<kafka-streams-internals-GlobalStateUpdateTask.adoc#initTopology, initTopology>>
52+
53+
* `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#initTopology, initTopology>>
54+
====
55+
56+
=== [[process]] Processing Record -- `process` Method
5057

5158
[source, java]
5259
----
@@ -55,7 +62,16 @@ void process(final K key, final V value)
5562

5663
`process`...FIXME
5764

58-
NOTE: `process` is used when...FIXME
65+
[NOTE]
66+
====
67+
`process` is used when:
68+
69+
* <<kafka-streams-internals-GlobalProcessorContextImpl.adoc#forward, GlobalProcessorContextImpl>> and <<kafka-streams-ProcessorContextImpl.adoc#forward, ProcessorContextImpl>> are requested to forward
70+
71+
* `GlobalStateUpdateTask` is requested to <<kafka-streams-internals-GlobalStateUpdateTask.adoc#update, update>>
72+
73+
* `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#process, process>>
74+
====
5975

6076
=== [[creating-instance]] Creating ProcessorNode Instance
6177

@@ -87,7 +103,7 @@ void close()
87103

88104
`close`...FIXME
89105

90-
NOTE: `close` is used when...FIXME
106+
NOTE: `close` is used exclusively when `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#closeTopology, closeTopology>> (when `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#suspend, suspend>>).
91107

92108
=== [[toString]] Describing Itself (Textual Representation) -- `toString` Method
93109

kafka-streams-internals-StreamTask.adoc

+4-4
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ NOTE: `closeTopology` is used exclusively when `StreamTask` is requested to <<su
7878
void suspend() // <1>
7979
8080
// PRIVATE API
81-
void suspend(final boolean clean)
81+
suspend(boolean clean, boolean isZombie)
8282
----
83-
<1> Uses `clean` flag enabled, i.e. `true`
83+
<1> Uses the private `suspend` with `clean` enabled and `isZombie` disabled
8484

8585
NOTE: `suspend` is part of <<kafka-streams-internals-Task.adoc#suspend, Task Contract>> to...FIXME
8686

8787
`suspend`...FIXME
8888

89-
NOTE: The private `suspend` is used exclusively when `StreamTask` is requested to <<close, close>>.
89+
NOTE: The private `suspend` is used when `StreamTask` is requested to <<close, close>>.
9090

9191
=== [[close]] `close` Method
9292

@@ -128,7 +128,7 @@ void initTopology()
128128

129129
`initTopology`...FIXME
130130

131-
NOTE: `initTopology` is used when...FIXME
131+
NOTE: `initTopology` is used exclusively when `StreamTask` is requested to <<initializeTopology, initializeTopology>>.
132132

133133
=== [[initializeTopology]] `initializeTopology` Method
134134

kafka-streams-internals-Task.adoc

+13-11
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Changelog partitions associated with the task
6666

6767
Used when:
6868

69-
* `AssignedTasks` is requested to <<kafka-streams-AssignedTasks.adoc#updateRestored, updateRestored>>, <<kafka-streams-AssignedTasks.adoc#addToRestoring, addToRestoring>> and <<kafka-streams-AssignedTasks.adoc#transitionToRunning, transitionToRunning>>
69+
* `AssignedTasks` is requested to <<kafka-streams-internals-AssignedTasks.adoc#updateRestored, updateRestored>>, <<kafka-streams-internals-AssignedTasks.adoc#addToRestoring, addToRestoring>> and <<kafka-streams-internals-AssignedTasks.adoc#transitionToRunning, transitionToRunning>>
7070
7171
* `StreamTask` is requested to <<kafka-streams-internals-StreamTask.adoc#initializeStateStores, initialize state stores>>.
7272
@@ -84,15 +84,15 @@ Used when:
8484

8585
* `AssignedTasks` is requested to do the following:
8686
87-
** link:kafka-streams-AssignedTasks.adoc#close[close]
87+
** link:kafka-streams-internals-AssignedTasks.adoc#close[close]
8888

89-
** link:kafka-streams-AssignedTasks.adoc#closeNonRunningTasks[closeNonRunningTasks]
89+
** link:kafka-streams-internals-AssignedTasks.adoc#closeNonRunningTasks[closeNonRunningTasks]
9090

91-
** link:kafka-streams-AssignedTasks.adoc#closeUnclean[closeUnclean]
91+
** link:kafka-streams-internals-AssignedTasks.adoc#closeUnclean[closeUnclean]
9292

93-
** link:kafka-streams-AssignedTasks.adoc#closeZombieTask[closeZombieTask]
93+
** link:kafka-streams-internals-AssignedTasks.adoc#closeZombieTask[closeZombieTask]
9494

95-
** link:kafka-streams-AssignedTasks.adoc#suspendTasks[suspendTasks] (and suspending a task has failed)
95+
** link:kafka-streams-internals-AssignedTasks.adoc#suspendTasks[suspendTasks] (and suspending a task has failed)
9696

9797
* `StandbyTask` is requested to link:kafka-streams-internals-StandbyTask.adoc#closeSuspended[closeSuspended]
9898
@@ -121,7 +121,7 @@ Used when:
121121

122122
* `AssignedStreamsTasks` is requested to execute the <<kafka-streams-AssignedStreamsTasks.adoc#maybeCommitAction, maybeCommitAction>> action
123123
124-
* `AssignedTasks` is requested to execute the <<kafka-streams-AssignedTasks.adoc#commitAction, commitAction>> action
124+
* `AssignedTasks` is requested to execute the <<kafka-streams-internals-AssignedTasks.adoc#commitAction, commitAction>> action
125125
126126
* `StandbyTask` is requested to <<kafka-streams-internals-StandbyTask.adoc#close, close>>
127127
@@ -153,9 +153,9 @@ boolean hasStateStores()
153153

154154
Used when `AssignedTasks` is requested to:
155155

156-
* link:kafka-streams-AssignedTasks.adoc#transitionToRunning[Schedule a task for execution]
156+
* link:kafka-streams-internals-AssignedTasks.adoc#transitionToRunning[Schedule a task for execution]
157157
158-
* link:kafka-streams-AssignedTasks.adoc#uninitializedPartitions[Get the partitions of the new tasks with a state store]
158+
* link:kafka-streams-internals-AssignedTasks.adoc#uninitializedPartitions[Get the partitions of the new tasks with a state store]
159159
160160
---
161161

@@ -183,7 +183,7 @@ Enabled (`true`) if the task has no state stores that need restoring.
183183

184184
* `StandbyTask` always returns <<kafka-streams-internals-StandbyTask.adoc#initializeStateStores, true>>
185185

186-
Used exclusively when `AssignedTasks` is requested to <<kafka-streams-AssignedTasks.adoc#initializeNewTasks, initializeNewTasks>>.
186+
Used exclusively when `AssignedTasks` is requested to <<kafka-streams-internals-AssignedTasks.adoc#initializeNewTasks, initializeNewTasks>>.
187187

188188
| initializeTopology
189189
a| [[initializeTopology]]
@@ -193,7 +193,7 @@ a| [[initializeTopology]]
193193
void initializeTopology()
194194
----
195195

196-
Used exclusively when `AssignedTasks` is requested to link:kafka-streams-AssignedTasks.adoc#transitionToRunning[schedule a task for execution]
196+
Used exclusively when `AssignedTasks` is requested to link:kafka-streams-internals-AssignedTasks.adoc#transitionToRunning[schedule a task for execution]
197197

198198
| partitions
199199
a| [[partitions]]
@@ -219,6 +219,8 @@ a| [[suspend]]
219219
void suspend()
220220
----
221221

222+
Used exclusively when `AssignedTasks` is requested to <<kafka-streams-internals-AssignedTasks.adoc#suspendTasks, suspend tasks>>.
223+
222224
| topology
223225
a| [[topology]]
224226

0 commit comments

Comments
 (0)