Skip to content

Commit 22b320a

Browse files
StreamThread et al.
1 parent e53080d commit 22b320a

9 files changed

+330
-220
lines changed

SUMMARY.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
. link:kafka-streams-multi-instance-kafka-streams-application.adoc[Multi-Instance Kafka Streams Applications]
99

1010
. link:kafka-streams-exactly-once-support-eos.adoc[Exactly-Once Support (EOS)]
11-
. link:kafka-streams-StreamThreads-StreamTasks-and-StandbyTasks.adoc[StreamThreads, StreamTasks and StandbyTasks]
11+
. link:kafka-streams-StreamThreads-StreamTasks-and-StandbyTasks.adoc[KafkaStreams, StreamThreads, StreamTasks and StandbyTasks]
1212

1313
== Demos
1414

Binary file not shown.
Binary file not shown.

kafka-streams-StreamThread-RebalanceListener.adoc

+4-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ Refer to link:kafka-logging.adoc#log4j.properties[Application Logging Using log4
5252

5353
[source, java]
5454
----
55-
void onPartitionsAssigned(final Collection<TopicPartition> assignment)
55+
void onPartitionsAssigned(
56+
Collection<TopicPartition> assignment)
5657
----
5758

5859
NOTE: `onPartitionsAssigned` is part of `ConsumerRebalanceListener` Contract in Apache Kafka to...FIXME.
@@ -82,7 +83,8 @@ partition assignment took [duration] ms.
8283

8384
[source, java]
8485
----
85-
void onPartitionsRevoked(final Collection<TopicPartition> assignment)
86+
void onPartitionsRevoked(
87+
Collection<TopicPartition> assignment)
8688
----
8789

8890
NOTE: `onPartitionsRevoked` is part of `ConsumerRebalanceListener` Contract in Apache Kafka to...FIXME.

kafka-streams-StreamThreads-StreamTasks-and-StandbyTasks.adoc

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
== StreamThreads, StreamTasks and StandbyTasks
1+
== KafkaStreams, StreamThreads, StreamTasks and StandbyTasks
22

3-
<<kafka-streams-StreamsConfig.adoc#NUM_STREAM_THREADS_CONFIG, StreamsConfig.NUM_STREAM_THREADS_CONFIG>> (`num.stream.threads`) is the number of <<kafka-streams-internals-StreamThread.adoc#, StreamThreads>> to execute stream processing.
3+
<<kafka-streams-StreamsConfig.adoc#NUM_STREAM_THREADS_CONFIG, StreamsConfig.NUM_STREAM_THREADS_CONFIG>> (`num.stream.threads`) is the number of <<kafka-streams-internals-StreamThread.adoc#, StreamThreads>> that a single <<kafka-streams-KafkaStreams.adoc#, KafkaStreams>> instance creates for stream processing.
4+
5+
With that, there will always be at least two threads running for a Kafka Streams application - the main thread of the application and one or many `StreamThreads`.
6+
7+
Every `StreamThread` manages its own <<kafka-streams-internals-TaskManager.adoc#, TaskManager>> (with the factories of <<kafka-streams-internals-TaskCreator.adoc#, StreamTasks>> and <<kafka-streams-internals-StandbyTaskCreator.adoc#, StandbyTasks>>).
48

59
* Number of StreamTasks = # TaskCreator.createTask = # AbstractTaskCreator.createTasks = TaskManager.addStreamTasks
610
@@ -21,3 +25,5 @@ DEBUG Assigning tasks {} to clients {} with number of replicas {}
2125
Step 1. StreamsPartitionAssignor.onAssignment(final Assignment assignment) —> TaskManager.setAssignmentMetadata(Map<TaskId, Set<TopicPartition>> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks)
2226

2327
Step 2. RebalanceListener.onPartitionsAssigned(Collection<TopicPartition> assignment) —> TaskManager.createTasks(assignment)
28+
29+
`StreamsPartitionAssignor` makes sure that the <<kafka-streams-internals-StreamsPartitionAssignor.adoc#processVersionOneAssignment, number of assigned partitions to a Kafka Streams application instance is exactly the same as number of active tasks>>.

kafka-streams-internals-InternalTopologyBuilder.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
== [[InternalTopologyBuilder]] InternalTopologyBuilder
22

3-
`InternalTopologyBuilder` is used to <<build, build a topology of processor nodes>> (for <<kafka-streams-internals-StandbyTaskCreator.adoc#, StreamThread.StandbyTaskCreators>> and <<kafka-streams-internals-TaskCreator.adoc#, StreamThread.TaskCreator>>).
3+
`InternalTopologyBuilder` is used to <<build, build a topology of processor nodes>> (for <<kafka-streams-internals-StandbyTaskCreator.adoc#, StreamThread.StandbyTaskCreator>> and <<kafka-streams-internals-TaskCreator.adoc#, StreamThread.TaskCreator>> task factories and hence <<kafka-streams-internals-Task.adoc#, stream processor tasks>> themselves).
44

55
.InternalTopologyBuilder and ProcessorTopology
66
image::images/kafka-streams-InternalTopologyBuilder-build.png[align="center"]
77

8-
`InternalTopologyBuilder` is <<creating-instance, created>> exclusively for a <<kafka-streams-Topology.adoc#internalTopologyBuilder, Topology>>.
8+
`InternalTopologyBuilder` is <<creating-instance, created>> exclusively for a <<kafka-streams-Topology.adoc#internalTopologyBuilder, Topology>> (which is simply the frontend to the internals of topology development).
99

1010
.InternalTopologyBuilder and Topology
1111
image::images/kafka-streams-InternalTopologyBuilder-Topology.png[align="center"]

kafka-streams-internals-StreamThread.adoc

+276-210
Large diffs are not rendered by default.

kafka-streams-internals-StreamsPartitionAssignor.adoc

+29
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,35 @@ Map<String, Assignment> computeNewAssignment(
274274

275275
NOTE: `computeNewAssignment` is used exclusively when `StreamsPartitionAssignor` is requested to <<assign, perform partition assignment>>.
276276

277+
=== [[processVersionOneAssignment]] `processVersionOneAssignment` Internal Method
278+
279+
[source, java]
280+
----
281+
void processVersionOneAssignment(
282+
AssignmentInfo info,
283+
List<TopicPartition> partitions,
284+
Map<TaskId, Set<TopicPartition>> activeTasks)
285+
----
286+
287+
`processVersionOneAssignment`...FIXME
288+
289+
NOTE: `processVersionOneAssignment` is used when `StreamsPartitionAssignor` is requested to <<onAssignment, handle partition assignment from a group leader>> (for version 1) and <<processVersionTwoAssignment, process partition assignment (version 2)>>.
290+
291+
=== [[processVersionTwoAssignment]] `processVersionTwoAssignment` Internal Method
292+
293+
[source, java]
294+
----
295+
void processVersionTwoAssignment(
296+
AssignmentInfo info,
297+
List<TopicPartition> partitions,
298+
Map<TaskId, Set<TopicPartition>> activeTasks,
299+
Map<TopicPartition, PartitionInfo> topicToPartitionInfo)
300+
----
301+
302+
`processVersionTwoAssignment`...FIXME
303+
304+
NOTE: `processVersionTwoAssignment` is used when...FIXME
305+
277306
=== [[internal-properties]] Internal Properties
278307

279308
[cols="30m,70",options="header",width="100%"]

kafka-streams-internals-TaskManager.adoc

+10-3
Original file line numberDiff line numberDiff line change
@@ -398,16 +398,23 @@ int commitAll()
398398

399399
NOTE: `commitAll` is used exclusively when `StreamThread` is requested to <<kafka-streams-internals-StreamThread.adoc#maybeCommit, commit all tasks (when commit interval elapsed)>>.
400400

401-
=== [[activeTaskIds]] `activeTaskIds` Method
401+
=== [[activeTaskIds]] All Active Tasks -- `activeTaskIds` Method
402402

403403
[source, java]
404404
----
405405
Set<TaskId> activeTaskIds()
406406
----
407407

408-
`activeTaskIds`...FIXME
408+
`activeTaskIds` simply requests the <<active, AssignedStreamsTasks>> for the <<kafka-streams-internals-AssignedStreamsTasks.adoc#allAssignedTaskIds, assigned task IDs>>.
409409

410-
NOTE: `activeTaskIds` is used when...FIXME
410+
[NOTE]
411+
====
412+
`activeTaskIds` is used when:
413+
414+
* `StreamThread` is requested to <<kafka-streams-internals-StreamThread.adoc#maybeCommit, commit all active tasks (when commit interval elapsed)>>
415+
416+
* `RebalanceListener` is requested to handle <<kafka-streams-StreamThread-RebalanceListener.adoc#onPartitionsAssigned, onPartitionsAssigned>> and <<kafka-streams-StreamThread-RebalanceListener.adoc#onPartitionsRevoked, onPartitionsRevoked>> events
417+
====
411418

412419
=== [[standbyTaskIds]] `standbyTaskIds` Method
413420

0 commit comments

Comments
 (0)