Skip to content

Commit e53080d

Browse files
committedAug 14, 2019
Topology and InternalTopologyBuilder
1 parent 77a68c5 commit e53080d

3 files changed

+285
-250
lines changed
 

‎SUMMARY.adoc

+45-43
Original file line numberDiff line numberDiff line change
@@ -10,32 +10,6 @@
1010
. link:kafka-streams-exactly-once-support-eos.adoc[Exactly-Once Support (EOS)]
1111
. link:kafka-streams-StreamThreads-StreamTasks-and-StandbyTasks.adoc[StreamThreads, StreamTasks and StandbyTasks]
1212

13-
=== State (Store) Management
14-
15-
. link:kafka-streams-internals-StateManager.adoc[StateManager]
16-
.. link:kafka-streams-internals-AbstractStateManager.adoc[AbstractStateManager]
17-
18-
. link:kafka-streams-internals-ProcessorStateManager.adoc[ProcessorStateManager]
19-
20-
. link:kafka-streams-internals-GlobalStateManager.adoc[GlobalStateManager]
21-
.. link:kafka-streams-internals-GlobalStateManagerImpl.adoc[GlobalStateManagerImpl]
22-
23-
. link:kafka-streams-internals-Checkpointable.adoc[Checkpointable]
24-
25-
. link:kafka-streams-internals-OffsetCheckpoint.adoc[OffsetCheckpoint]
26-
27-
. link:kafka-streams-internals-ChangelogReader.adoc[ChangelogReader]
28-
.. link:kafka-streams-internals-StoreChangelogReader.adoc[StoreChangelogReader]
29-
30-
. link:kafka-streams-internals-StateRestorer.adoc[StateRestorer]
31-
32-
. link:kafka-streams-internals-RestoringTasks.adoc[RestoringTasks]
33-
34-
. link:kafka-streams-internals-RecordBatchingStateRestoreCallback.adoc[RecordBatchingStateRestoreCallback]
35-
.. link:kafka-streams-internals-CompositeRestoreListener.adoc[CompositeRestoreListener]
36-
37-
. link:kafka-streams-internals-StateRestoreCallbackAdapter.adoc[StateRestoreCallbackAdapter]
38-
3913
== Demos
4014

4115
. link:kafka-streams-demo-creating-topology-with-state-store-logging-enabled.adoc[Creating Topology with State Store with Logging Enabled]
@@ -188,6 +162,25 @@
188162

189163
== Internals of Kafka Streams
190164

165+
=== Low-Level Stream Processing Graph
166+
167+
. link:kafka-streams-internals-InternalTopologyBuilder.adoc[InternalTopologyBuilder]
168+
169+
. link:kafka-streams-internals-InternalTopologyBuilder-AbstractNode.adoc[AbstractNode]
170+
.. link:kafka-streams-internals-InternalTopologyBuilder-Processor.adoc[Processor]
171+
.. link:kafka-streams-internals-InternalTopologyBuilder-Sink.adoc[Sink]
172+
.. link:kafka-streams-internals-InternalTopologyBuilder-Source.adoc[Source]
173+
174+
. link:kafka-streams-internals-InternalTopologyBuilder-NodeFactory.adoc[NodeFactory]
175+
.. link:kafka-streams-internals-InternalTopologyBuilder-ProcessorNodeFactory.adoc[ProcessorNodeFactory]
176+
.. link:kafka-streams-internals-InternalTopologyBuilder-SinkNodeFactory.adoc[SinkNodeFactory]
177+
.. link:kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc[SourceNodeFactory]
178+
179+
. link:kafka-streams-internals-InternalTopologyBuilder-TopologyDescription.adoc[InternalTopologyBuilder.TopologyDescription]
180+
181+
. link:kafka-streams-internals-InternalTopologyBuilder-GlobalStore.adoc[GlobalStore]
182+
. link:kafka-streams-internals-InternalTopologyBuilder-StateStoreFactory.adoc[StateStoreFactory]
183+
191184
=== High-Level Stream Processing Graph
192185

193186
. link:kafka-streams-internals-InternalStreamsBuilder.adoc[InternalStreamsBuilder]
@@ -303,23 +296,6 @@
303296
.. link:kafka-streams-internals-SourceNode.adoc[SourceNode]
304297
.. link:kafka-streams-internals-SinkNode.adoc[SinkNode]
305298

306-
. link:kafka-streams-internals-InternalTopologyBuilder.adoc[InternalTopologyBuilder]
307-
308-
. link:kafka-streams-internals-InternalTopologyBuilder-AbstractNode.adoc[AbstractNode]
309-
.. link:kafka-streams-internals-InternalTopologyBuilder-Processor.adoc[Processor]
310-
.. link:kafka-streams-internals-InternalTopologyBuilder-Sink.adoc[Sink]
311-
.. link:kafka-streams-internals-InternalTopologyBuilder-Source.adoc[Source]
312-
313-
. link:kafka-streams-internals-InternalTopologyBuilder-NodeFactory.adoc[NodeFactory]
314-
.. link:kafka-streams-internals-InternalTopologyBuilder-ProcessorNodeFactory.adoc[ProcessorNodeFactory]
315-
.. link:kafka-streams-internals-InternalTopologyBuilder-SinkNodeFactory.adoc[SinkNodeFactory]
316-
.. link:kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc[SourceNodeFactory]
317-
318-
. link:kafka-streams-internals-InternalTopologyBuilder-TopologyDescription.adoc[InternalTopologyBuilder.TopologyDescription]
319-
320-
. link:kafka-streams-internals-InternalTopologyBuilder-GlobalStore.adoc[GlobalStore]
321-
. link:kafka-streams-internals-InternalTopologyBuilder-StateStoreFactory.adoc[StateStoreFactory]
322-
323299
. link:kafka-streams-internals-NodeMetrics.adoc[NodeMetrics]
324300

325301
. link:kafka-streams-internals-InternalTopicConfig.adoc[InternalTopicConfig]
@@ -444,6 +420,32 @@
444420

445421
. link:kafka-streams-internals-CopartitionedTopicsValidator.adoc[CopartitionedTopicsValidator]
446422

423+
=== State (Store) Management
424+
425+
. link:kafka-streams-internals-StateManager.adoc[StateManager]
426+
.. link:kafka-streams-internals-AbstractStateManager.adoc[AbstractStateManager]
427+
428+
. link:kafka-streams-internals-ProcessorStateManager.adoc[ProcessorStateManager]
429+
430+
. link:kafka-streams-internals-GlobalStateManager.adoc[GlobalStateManager]
431+
.. link:kafka-streams-internals-GlobalStateManagerImpl.adoc[GlobalStateManagerImpl]
432+
433+
. link:kafka-streams-internals-Checkpointable.adoc[Checkpointable]
434+
435+
. link:kafka-streams-internals-OffsetCheckpoint.adoc[OffsetCheckpoint]
436+
437+
. link:kafka-streams-internals-ChangelogReader.adoc[ChangelogReader]
438+
.. link:kafka-streams-internals-StoreChangelogReader.adoc[StoreChangelogReader]
439+
440+
. link:kafka-streams-internals-StateRestorer.adoc[StateRestorer]
441+
442+
. link:kafka-streams-internals-RestoringTasks.adoc[RestoringTasks]
443+
444+
. link:kafka-streams-internals-RecordBatchingStateRestoreCallback.adoc[RecordBatchingStateRestoreCallback]
445+
.. link:kafka-streams-internals-CompositeRestoreListener.adoc[CompositeRestoreListener]
446+
447+
. link:kafka-streams-internals-StateRestoreCallbackAdapter.adoc[StateRestoreCallbackAdapter]
448+
447449
== Deprecated
448450

449451
. link:kafka-streams-StoreFactory.adoc[StoreFactory]

‎kafka-streams-Topology.adoc

+136-124
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1-
== [[Topology]] Topology -- Logical Processor Node Topology
1+
== [[Topology]] Topology -- Directed Acyclic Graph of Stream Processing Nodes
22

3-
`Topology` is an *directed acyclic graph of processors* (aka _DAG of stream processing nodes_) that represents the stream processing logic of a Kafka Streams application.
3+
`Topology` is a *directed acyclic graph of stream processing nodes* that represents the stream processing logic of a Kafka Streams application.
44

5-
`Topology` is a logical representation of a <<kafka-streams-internals-ProcessorTopology.adoc#, ProcessorTopology>>.
6-
7-
`Topology` provides the <<operators, fluent API>> to <<creating-instance, create a processing topology>> of <<addStateStore, local>> and <<addGlobalStore, global>> state stores, <<addSource, sources>>, <<addProcessor, processors>> and <<addSink, sinks>>.
5+
`Topology` can be <<creating-instance, created>> directly (as part of <<kafka-streams-processor-api.adoc#, Low-Level Processor API>>) or indirectly using <<kafka-streams-streams-dsl.adoc#, Streams DSL -- High-Level Stream Processing DSL>>.
86

9-
`Topology` can be <<creating-instance, created>> directly or indirectly using <<kafka-streams-streams-dsl.adoc#, Streams DSL -- High-Level Stream Processing DSL>>.
7+
`Topology` provides the <<operators, fluent API>> to add <<addStateStore, local>> and <<addGlobalStore, global>> state stores, <<addSource, sources>>, <<addProcessor, processors>> and <<addSink, sinks>> to build advanced stream processing graphs.
108

119
[[creating-instance]]
1210
`Topology` takes no arguments when created.
@@ -79,11 +77,7 @@ Topologies:
7977
<-- demo-source-processor
8078
----
8179

82-
[[AutoOffsetReset]]
83-
`Topology` defines *offset reset policy* (`AutoOffsetReset`) that can be one of the two possible values:
84-
85-
* [[EARLIEST]] `EARLIEST`
86-
* [[LATEST]] `LATEST`
80+
`Topology` is a logical representation of a <<kafka-streams-internals-ProcessorTopology.adoc#, ProcessorTopology>>.
8781

8882
[[operators]]
8983
.Topology API / Methods
@@ -98,169 +92,177 @@ a| [[addGlobalStore]]
9892
[source, java]
9993
----
10094
Topology addGlobalStore(
101-
final StoreBuilder storeBuilder,
102-
final String sourceName,
103-
final Deserializer keyDeserializer,
104-
final Deserializer valueDeserializer,
105-
final String topic,
106-
final String processorName,
107-
final ProcessorSupplier stateUpdateSupplier)
95+
StoreBuilder storeBuilder,
96+
String sourceName,
97+
Deserializer keyDeserializer,
98+
Deserializer valueDeserializer,
99+
String topic,
100+
String processorName,
101+
ProcessorSupplier stateUpdateSupplier)
108102
Topology addGlobalStore(
109-
final StoreBuilder storeBuilder,
110-
final String sourceName,
111-
final TimestampExtractor timestampExtractor,
112-
final Deserializer keyDeserializer,
113-
final Deserializer valueDeserializer,
114-
final String topic,
115-
final String processorName,
116-
final ProcessorSupplier stateUpdateSupplier)
103+
StoreBuilder storeBuilder,
104+
String sourceName,
105+
TimestampExtractor timestampExtractor,
106+
Deserializer keyDeserializer,
107+
Deserializer valueDeserializer,
108+
String topic,
109+
String processorName,
110+
ProcessorSupplier stateUpdateSupplier)
117111
----
118112

119113
Adds a global <<kafka-streams-StateStore.adoc#, StateStore>> (with the <<kafka-streams-StoreBuilder.adoc#, StoreBuilder>>, <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>> and optional <<kafka-streams-TimestampExtractor.adoc#, TimestampExtractor>>) to the topology.
120114

115+
Internally, `addGlobalStore` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addGlobalStore, add a global store>>.
116+
121117
| addProcessor
122118
a| [[addProcessor]]
123119

124120
[source, java]
125121
----
126122
Topology addProcessor(
127-
final String name,
128-
final ProcessorSupplier supplier,
129-
final String... parentNames)
123+
String name,
124+
ProcessorSupplier supplier,
125+
String... parentNames)
130126
----
131127

132128
Adds a new <<kafka-streams-Processor.adoc#, processor node>> (with the <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>>) to the topology
133129

130+
Internally, `addProcessor` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addProcessor, add a processor>>.
131+
134132
| addSink
135133
a| [[addSink]]
136134

137135
[source, java]
138136
----
139137
Topology addSink(
140-
final String name,
141-
final String topic,
142-
final Serializer<K> keySerializer,
143-
final Serializer<V> valueSerializer,
144-
final StreamPartitioner<? super K, ? super V> partitioner,
145-
final String... parentNames)
138+
String name,
139+
String topic,
140+
Serializer<K> keySerializer,
141+
Serializer<V> valueSerializer,
142+
StreamPartitioner<? super K, ? super V> partitioner,
143+
String... parentNames)
146144
Topology addSink(
147-
final String name,
148-
final String topic,
149-
final Serializer<K> keySerializer,
150-
final Serializer<V> valueSerializer,
151-
final String... parentNames)
145+
String name,
146+
String topic,
147+
Serializer<K> keySerializer,
148+
Serializer<V> valueSerializer,
149+
String... parentNames)
152150
Topology addSink(
153-
final String name,
154-
final String topic,
155-
final StreamPartitioner<? super K, ? super V> partitioner,
156-
final String... parentNames)
151+
String name,
152+
String topic,
153+
StreamPartitioner<? super K, ? super V> partitioner,
154+
String... parentNames)
157155
Topology addSink(
158-
final String name,
159-
final String topic,
160-
final String... parentNames)
156+
String name,
157+
String topic,
158+
String... parentNames)
161159
Topology addSink(
162-
final String name,
163-
final TopicNameExtractor<K, V> topicExtractor,
164-
final Serializer<K> keySerializer,
165-
final Serializer<V> valueSerializer,
166-
final StreamPartitioner<? super K, ? super V> partitioner,
167-
final String... parentNames)
160+
String name,
161+
TopicNameExtractor<K, V> topicExtractor,
162+
Serializer<K> keySerializer,
163+
Serializer<V> valueSerializer,
164+
StreamPartitioner<? super K, ? super V> partitioner,
165+
String... parentNames)
168166
Topology addSink(
169-
final String name,
170-
final TopicNameExtractor<K, V> topicExtractor,
171-
final Serializer<K> keySerializer,
172-
final Serializer<V> valueSerializer,
173-
final String... parentNames)
167+
String name,
168+
TopicNameExtractor<K, V> topicExtractor,
169+
Serializer<K> keySerializer,
170+
Serializer<V> valueSerializer,
171+
String... parentNames)
174172
Topology addSink(
175-
final String name,
176-
final TopicNameExtractor<K, V> topicExtractor,
177-
final StreamPartitioner<? super K, ? super V> partitioner,
178-
final String... parentNames)
173+
String name,
174+
TopicNameExtractor<K, V> topicExtractor,
175+
StreamPartitioner<? super K, ? super V> partitioner,
176+
String... parentNames)
179177
Topology addSink(
180-
final String name,
181-
final TopicNameExtractor<K, V> topicExtractor,
182-
final String... parentNames)
178+
String name,
179+
TopicNameExtractor<K, V> topicExtractor,
180+
String... parentNames)
183181
----
184182

185183
Adds a new <<kafka-streams-internals-SinkNode.adoc#, sink node>> (with the optional <<kafka-streams-TopicNameExtractor.adoc#, TopicNameExtractor>> and <<kafka-streams-StreamPartitioner.adoc#, StreamPartitioner>>) to the topology.
186184

185+
Internally, `addSink` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addSink, add a sink>>.
186+
187187
| addSource
188188
a| [[addSource]]
189189

190190
[source, java]
191191
----
192192
Topology addSource(
193-
final AutoOffsetReset offsetReset,
194-
final String name,
195-
final Deserializer keyDeserializer,
196-
final Deserializer valueDeserializer,
197-
final Pattern topicPattern)
193+
AutoOffsetReset offsetReset,
194+
String name,
195+
Deserializer keyDeserializer,
196+
Deserializer valueDeserializer,
197+
Pattern topicPattern)
198198
Topology addSource(
199-
final AutoOffsetReset offsetReset,
200-
final String name,
201-
final Deserializer keyDeserializer,
202-
final Deserializer valueDeserializer,
203-
final String... topics)
199+
AutoOffsetReset offsetReset,
200+
String name,
201+
Deserializer keyDeserializer,
202+
Deserializer valueDeserializer,
203+
String... topics)
204204
Topology addSource(
205-
final AutoOffsetReset offsetReset,
206-
final String name,
207-
final Pattern topicPattern)
205+
AutoOffsetReset offsetReset,
206+
String name,
207+
Pattern topicPattern)
208208
Topology addSource(
209-
final AutoOffsetReset offsetReset,
210-
final String name,
211-
final String... topics)
209+
AutoOffsetReset offsetReset,
210+
String name,
211+
String... topics)
212212
Topology addSource(
213-
final AutoOffsetReset offsetReset,
214-
final String name,
215-
final TimestampExtractor timestampExtractor,
216-
final Deserializer keyDeserializer,
217-
final Deserializer valueDeserializer,
218-
final Pattern topicPattern)
213+
AutoOffsetReset offsetReset,
214+
String name,
215+
TimestampExtractor timestampExtractor,
216+
Deserializer keyDeserializer,
217+
Deserializer valueDeserializer,
218+
Pattern topicPattern)
219219
Topology addSource(
220-
final AutoOffsetReset offsetReset,
221-
final String name,
222-
final TimestampExtractor timestampExtractor,
223-
final Deserializer keyDeserializer,
224-
final Deserializer valueDeserializer,
225-
final String... topics)
220+
AutoOffsetReset offsetReset,
221+
String name,
222+
TimestampExtractor timestampExtractor,
223+
Deserializer keyDeserializer,
224+
Deserializer valueDeserializer,
225+
String... topics)
226226
Topology addSource(
227-
final AutoOffsetReset offsetReset,
228-
final TimestampExtractor timestampExtractor,
229-
final String name,
230-
final Pattern topicPattern)
227+
AutoOffsetReset offsetReset,
228+
TimestampExtractor timestampExtractor,
229+
String name,
230+
Pattern topicPattern)
231231
Topology addSource(
232-
final AutoOffsetReset offsetReset,
233-
final TimestampExtractor timestampExtractor,
234-
final String name,
235-
final String... topics)
232+
AutoOffsetReset offsetReset,
233+
TimestampExtractor timestampExtractor,
234+
String name,
235+
String... topics)
236236
Topology addSource(
237-
final String name,
238-
final Deserializer keyDeserializer,
239-
final Deserializer valueDeserializer,
240-
final Pattern topicPattern)
237+
String name,
238+
Deserializer keyDeserializer,
239+
Deserializer valueDeserializer,
240+
Pattern topicPattern)
241241
Topology addSource(
242-
final String name,
243-
final Deserializer keyDeserializer,
244-
final Deserializer valueDeserializer,
245-
final String... topics)
242+
String name,
243+
Deserializer keyDeserializer,
244+
Deserializer valueDeserializer,
245+
String... topics)
246246
Topology addSource(
247-
final String name,
248-
final Pattern topicPattern)
247+
String name,
248+
Pattern topicPattern)
249249
Topology addSource(
250-
final String name,
251-
final String... topics)
250+
String name,
251+
String... topics)
252252
Topology addSource(
253-
final TimestampExtractor timestampExtractor,
254-
final String name,
255-
final Pattern topicPattern)
253+
TimestampExtractor timestampExtractor,
254+
String name,
255+
Pattern topicPattern)
256256
Topology addSource(
257-
final TimestampExtractor timestampExtractor,
258-
final String name,
259-
final String... topics)
257+
TimestampExtractor timestampExtractor,
258+
String name,
259+
String... topics)
260260
----
261261

262262
Adds a new <<kafka-streams-internals-SourceNode.adoc#, source node>> (with the optional <<AutoOffsetReset, AutoOffsetReset>> and <<kafka-streams-TimestampExtractor.adoc#, TimestampExtractor>>) to the topology.
263263

264+
Internally, `addSource` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addSource, add a source>>.
265+
264266
| addStateStore
265267
a| [[addStateStore]]
266268

@@ -273,20 +275,22 @@ Topology addStateStore(
273275

274276
Adds a new <<kafka-streams-StateStore.adoc#, state store>> (as a <<kafka-streams-StoreBuilder.adoc#, StoreBuilder>>) to the topology and associates it with processors
275277

276-
Internally, `addStateStore` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addStateStore, add a state store (as a StoreBuilder)>>
278+
Internally, `addStateStore` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addStateStore, add a state store>>.
277279

278280
| connectProcessorAndStateStores
279281
a| [[connectProcessorAndStateStores]]
280282

281283
[source, java]
282284
----
283285
Topology connectProcessorAndStateStores(
284-
final String processorName,
285-
final String... stateStoreNames)
286+
String processorName,
287+
String... stateStoreNames)
286288
----
287289

288290
Connects the <<kafka-streams-internals-ProcessorNode.adoc#, processor node>> with <<kafka-streams-StateStore.adoc#, state stores>> (by name).
289291

292+
Internally, `connectProcessorAndStateStores` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#connectProcessorAndStateStores, connect a processor with state stores>>.
293+
290294
| describe
291295
a| [[describe]]
292296

@@ -295,12 +299,20 @@ a| [[describe]]
295299
TopologyDescription describe()
296300
----
297301

298-
<<kafka-streams-TopologyDescription.adoc#, Meta representation>> of the topology
302+
Describes the topology via <<kafka-streams-TopologyDescription.adoc#, TopologyDescription>> (_meta representation_)
303+
304+
Internally, `describe` simply requests the <<internalTopologyBuilder, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#describe, describe a topology>>.
299305

300306
|===
301307

302308
[[internalTopologyBuilder]]
303-
Internally, `Topology` uses an <<kafka-streams-internals-InternalTopologyBuilder.adoc#, InternalTopologyBuilder>> in all <<operators, methods>> (and is therefore a thin layer atop).
309+
Internally, `Topology` uses an <<kafka-streams-internals-InternalTopologyBuilder.adoc#, InternalTopologyBuilder>> for all the <<operators, methods>> and is simply a thin layer atop (that aims at making Kafka Streams developers' life simpler).
304310

305311
.Topology and InternalTopologyBuilder
306312
image::images/kafka-streams-Topology-InternalTopologyBuilder.png[align="center"]
313+
314+
[[AutoOffsetReset]]
315+
`Topology` defines *offset reset policy* (`AutoOffsetReset`) that can be one of the following values:
316+
317+
* [[EARLIEST]] `EARLIEST`
318+
* [[LATEST]] `LATEST`

‎kafka-streams-internals-InternalTopologyBuilder.adoc

+104-83
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ Topologies:
155155
| [[earliestResetTopics]]
156156

157157
| globalStateStores
158-
a| [[globalStateStores]] Global link:kafka-streams-StateStore.adoc[StateStores] by name
158+
a| [[globalStateStores]] Global <<kafka-streams-StateStore.adoc#, state stores>> by name
159159

160160
* A new global link:kafka-streams-StateStore.adoc[StateStore] is added exclusively when `InternalTopologyBuilder` is requested to <<addGlobalStore, add a global state store to a topology>>
161161
@@ -296,11 +296,11 @@ String decorateTopic(final String topic)
296296
[source, java]
297297
----
298298
void buildSinkNode(
299-
final Map<String, ProcessorNode> processorMap,
300-
final Map<String, SinkNode> topicSinkMap,
301-
final Set<String> repartitionTopics,
302-
final SinkNodeFactory sinkNodeFactory,
303-
final SinkNode node)
299+
Map<String, ProcessorNode> processorMap,
300+
Map<String, SinkNode> topicSinkMap,
301+
Set<String> repartitionTopics,
302+
SinkNodeFactory sinkNodeFactory,
303+
SinkNode node)
304304
----
305305

306306
`buildSinkNode`...FIXME
@@ -344,14 +344,26 @@ NOTE: `resetTopicsPattern` is used when...FIXME
344344

345345
[source, java]
346346
----
347-
synchronized Collection<Set<String>> copartitionGroups()
347+
Collection<Set<String>> copartitionGroups()
348348
----
349349

350350
`copartitionGroups`...FIXME
351351

352-
NOTE: `copartitionGroups` is used when...FIXME
352+
NOTE: `copartitionGroups` is used exclusively when `StreamsPartitionAssignor` is requested to <<kafka-streams-internals-StreamsPartitionAssignor.adoc#assign, perform group assignment (assign tasks to consumer clients)>>.
353+
354+
=== [[copartitionSources]] `copartitionSources` Method
355+
356+
[source, java]
357+
----
358+
void copartitionSources(
359+
Collection<String> sourceNodes)
360+
----
361+
362+
`copartitionSources`...FIXME
363+
364+
NOTE: `copartitionSources` is used exclusively when `AbstractStream` is requested to <<kafka-streams-internals-AbstractStream.adoc#ensureJoinableWith, ensureJoinableWith>>.
353365

354-
=== [[addProcessor]] Registering Processor Node -- `addProcessor` Method
366+
=== [[addProcessor]] Adding Processor to Topology -- `addProcessor` Method
355367

356368
[source, java]
357369
----
@@ -404,34 +416,34 @@ NOTE: `buildProcessorNode` is used exclusively when `InternalTopologyBuilder` is
404416
[source, java]
405417
----
406418
void buildSourceNode(
407-
final Map<String, SourceNode> topicSourceMap,
408-
final Set<String> repartitionTopics,
409-
final SourceNodeFactory sourceNodeFactory,
410-
final SourceNode node)
419+
Map<String, SourceNode> topicSourceMap,
420+
Set<String> repartitionTopics,
421+
SourceNodeFactory sourceNodeFactory,
422+
SourceNode node)
411423
----
412424

413425
`buildSourceNode`...FIXME
414426

415-
NOTE: `buildSourceNode` is used exclusively when `InternalTopologyBuilder` is requested to link:kafka-streams-internals-InternalTopologyBuilder.adoc#build[build a topology of processor tasks] (aka *processor topology*).
427+
NOTE: `buildSourceNode` is used exclusively when `InternalTopologyBuilder` is requested to <<build, build a topology of processor nodes>>.
416428

417-
=== [[addSource]] Registering Source Processor Node -- `addSource` Method
429+
=== [[addSource]] Adding Source Node to Topology -- `addSource` Method
418430

419-
[source, scala]
431+
[source, java]
420432
----
421-
final void addSource(
422-
final Topology.AutoOffsetReset offsetReset,
423-
final String name,
424-
final TimestampExtractor timestampExtractor,
425-
final Deserializer keyDeserializer,
426-
final Deserializer valDeserializer,
427-
final Pattern topicPattern)
428-
final void addSource(
429-
final Topology.AutoOffsetReset offsetReset,
430-
final String name,
431-
final TimestampExtractor timestampExtractor,
432-
final Deserializer keyDeserializer,
433-
final Deserializer valDeserializer,
434-
final String... topics)
433+
void addSource(
434+
Topology.AutoOffsetReset offsetReset,
435+
String name,
436+
TimestampExtractor timestampExtractor,
437+
Deserializer keyDeserializer,
438+
Deserializer valDeserializer,
439+
Pattern topicPattern)
440+
void addSource(
441+
Topology.AutoOffsetReset offsetReset,
442+
String name,
443+
TimestampExtractor timestampExtractor,
444+
Deserializer keyDeserializer,
445+
Deserializer valDeserializer,
446+
String... topics)
435447
----
436448

437449
`addSource` simply registers a new <<kafka-streams-internals-InternalTopologyBuilder-SourceNodeFactory.adoc#, SourceNodeFactory>> by the given name in the <<nodeFactories, nodeFactories>> internal registry.
@@ -494,13 +506,13 @@ void validateTopicNotAlreadyRegistered(final String topic)
494506

495507
NOTE: `validateTopicNotAlreadyRegistered` is used when...FIXME
496508

497-
=== [[connectProcessorAndStateStores]] Connecting State Store with Processor Nodes -- `connectProcessorAndStateStores` Method
509+
=== [[connectProcessorAndStateStores]] Connecting Processor with State Stores -- `connectProcessorAndStateStores` Method
498510

499511
[source, java]
500512
----
501513
void connectProcessorAndStateStores(
502-
final String processorName,
503-
final String... stateStoreNames)
514+
String processorName,
515+
String... stateStoreNames)
504516
----
505517

506518
`connectProcessorAndStateStores` simply <<connectProcessorAndStateStore, connectProcessorAndStateStore>> with `processorName` and every state store name in `stateStoreNames`.
@@ -515,26 +527,24 @@ NOTE: `connectProcessorAndStateStores` (plural) is a public method that uses the
515527
====
516528
`connectProcessorAndStateStores` is used when:
517529
518-
* `KStreamImpl` is requested to link:kafka-streams-internals-KStreamImpl.adoc#doStreamTableJoin[doStreamTableJoin], link:kafka-streams-internals-KStreamImpl.adoc#process[process], link:kafka-streams-internals-KStreamImpl.adoc#transform[transform], link:kafka-streams-internals-KStreamImpl.adoc#transformValues[transformValues]
530+
* `Topology` is requested to <<kafka-streams-Topology.adoc#connectProcessorAndStateStores, connect a processor with state stores>>
519531
520-
* `KTableImpl` is requested to link:kafka-streams-internals-KTableImpl.adoc#buildJoin[buildJoin]
521-
522-
* `Topology` is requested to link:kafka-streams-Topology.adoc#connectProcessorAndStateStores[connectProcessorAndStateStores]
532+
* <<kafka-streams-internals-StreamsGraphNode.adoc#, StreamsGraphNodes>> (i.e. <<kafka-streams-internals-KTableKTableJoinNode.adoc#, KTableKTableJoinNode>>, <<kafka-streams-internals-StatefulProcessorNode.adoc#, StatefulProcessorNode>>, <<kafka-streams-internals-StreamTableJoinNode.adoc#, StreamTableJoinNode>>, and <<kafka-streams-internals-TableProcessorNode.adoc#, TableProcessorNode>>) are requested to <<kafka-streams-internals-StreamsGraphNode.adoc#writeToTopology, write to a topology>>
523533
====
524534

525-
=== [[addGlobalStore]] Adding Global Key-Value State Store (to Topology) -- `addGlobalStore` Method
535+
=== [[addGlobalStore]] Adding Global State Store to Topology -- `addGlobalStore` Method
526536

527537
[source, java]
528538
----
529539
void addGlobalStore(
530-
final StoreBuilder<KeyValueStore> storeBuilder,
531-
final String sourceName,
532-
final TimestampExtractor timestampExtractor,
533-
final Deserializer keyDeserializer,
534-
final Deserializer valueDeserializer,
535-
final String topic,
536-
final String processorName,
537-
final ProcessorSupplier stateUpdateSupplier)
540+
StoreBuilder storeBuilder,
541+
String sourceName,
542+
TimestampExtractor timestampExtractor,
543+
Deserializer keyDeserializer,
544+
Deserializer valueDeserializer,
545+
String topic,
546+
String processorName,
547+
ProcessorSupplier stateUpdateSupplier)
538548
----
539549

540550
`addGlobalStore` first <<validateGlobalStoreArguments, validateGlobalStoreArguments>> followed by <<validateTopicNotAlreadyRegistered, validateTopicNotAlreadyRegistered>>.
@@ -606,8 +616,8 @@ NOTE: `validateGlobalStoreArguments` is used exclusively when `InternalTopologyB
606616
[source, java]
607617
----
608618
void connectSourceStoreAndTopic(
609-
final String sourceStoreName,
610-
final String topic)
619+
String sourceStoreName,
620+
String topic)
611621
----
612622

613623
`connectSourceStoreAndTopic` adds the given `sourceStoreName` with the `topic` to <<storeToChangelogTopic, storeToChangelogTopic>> internal registry.
@@ -618,7 +628,14 @@ void connectSourceStoreAndTopic(
618628
Source store [sourceStoreName] is already added.
619629
```
620630

621-
NOTE: `connectSourceStoreAndTopic` is used when `InternalTopologyBuilder` is requested to <<addGlobalStore, add a global state store to a topology>> and <<adjust, adjust>>.
631+
[NOTE]
632+
====
633+
`connectSourceStoreAndTopic` is used when:
634+
635+
* `InternalTopologyBuilder` is requested to <<addGlobalStore, add a global state store to a topology>>
636+
637+
* `TableSourceNode` is requested to <<kafka-streams-internals-TableSourceNode.adoc#writeToTopology, write to a topology>>
638+
====
622639

623640
=== [[connectProcessorAndStateStore]] Connecting State Store with Processor Node -- `connectProcessorAndStateStore` Internal Method
624641

@@ -641,22 +658,22 @@ In the end, `connectProcessorAndStateStore` <<connectStateStoreNameToSourceTopic
641658

642659
`connectProcessorAndStateStore` reports a `TopologyException` when the input `stateStoreName` or `processorName` have not been registered yet or the `processorName` is the name of a source or sink node.
643660

644-
NOTE: `connectProcessorAndStateStore` is used when `InternalTopologyBuilder` is requested to <<addStateStore, addStateStore>> and <<connectProcessorAndStateStores, connectProcessorAndStateStores>>
661+
NOTE: `connectProcessorAndStateStore` is used when `InternalTopologyBuilder` is requested to <<addStateStore, add a state store>> and <<connectProcessorAndStateStores, connect a processor with state stores>>.
645662

646663
=== [[connectStateStoreNameToSourceTopicsOrPattern]] `connectStateStoreNameToSourceTopicsOrPattern` Internal Method
647664

648-
[source, scala]
665+
[source, java]
649666
----
650667
void connectStateStoreNameToSourceTopicsOrPattern(
651-
final String stateStoreName,
652-
final ProcessorNodeFactory processorNodeFactory)
668+
String stateStoreName,
669+
ProcessorNodeFactory processorNodeFactory)
653670
----
654671

655672
`connectStateStoreNameToSourceTopicsOrPattern`...FIXME
656673

657-
NOTE: `connectStateStoreNameToSourceTopicsOrPattern` is used when...FIXME
674+
NOTE: `connectStateStoreNameToSourceTopicsOrPattern` is used exclusively when `InternalTopologyBuilder` is requested to <<connectProcessorAndStateStore, connect a processor with a state store>>
658675

659-
=== [[addStateStore]] Adding State Store (As StoreBuilder) -- `addStateStore` Method
676+
=== [[addStateStore]] Adding State Store to Topology -- `addStateStore` Method
660677

661678
[source, java]
662679
----
@@ -736,7 +753,7 @@ ProcessorTopology buildGlobalStateTopology()
736753

737754
`buildGlobalStateTopology` returns `null` if <<globalNodeGroups, globalNodeGroups>> is empty.
738755

739-
NOTE: `buildGlobalStateTopology` is used exclusively when `KafkaStreams` is link:kafka-streams-KafkaStreams.adoc#globalStreamThread[created].
756+
NOTE: `buildGlobalStateTopology` is used exclusively when `KafkaStreams` is <<kafka-streams-KafkaStreams.adoc#globalStreamThread, created>>.
740757

741758
=== [[describeGlobalStore]] `describeGlobalStore` Internal Method
742759

@@ -779,7 +796,7 @@ Otherwise, `isGlobalSource` is negative (i.e. `false`).
779796

780797
NOTE: `isGlobalSource` is used when `InternalTopologyBuilder` is requested to <<describeGlobalStore, describeGlobalStore>>, <<globalNodeGroups, globalNodeGroups>> and <<nodeGroupContainsGlobalSourceNode, nodeGroupContainsGlobalSourceNode>>.
781798

782-
=== [[globalNodeGroups]] Collecting Global Node Groups -- `globalNodeGroups` Internal Method
799+
=== [[globalNodeGroups]] Global Node Groups -- `globalNodeGroups` Internal Method
783800

784801
[source, java]
785802
----
@@ -896,7 +913,7 @@ Topologies:
896913
<-- sourceName
897914
----
898915

899-
NOTE: `describe` is used exclusively when `Topology` is requested to link:kafka-streams-Topology.adoc#describe[describe].
916+
NOTE: `describe` is used exclusively when `Topology` is requested to <<kafka-streams-Topology.adoc#describe, describe>>.
900917

901918
==== [[describeSubtopology]] `describeSubtopology` Internal Method
902919

@@ -930,19 +947,19 @@ NOTE: `describeGlobalStore` is used exclusively when `InternalTopologyBuilder` i
930947
[source, java]
931948
----
932949
void addSink(
933-
final String name,
934-
final String topic,
935-
final Serializer<K> keySerializer,
936-
final Serializer<V> valSerializer,
937-
final StreamPartitioner<? super K, ? super V> partitioner,
938-
final String... predecessorNames) // <1>
950+
String name,
951+
String topic,
952+
Serializer<K> keySerializer,
953+
Serializer<V> valSerializer,
954+
StreamPartitioner<? super K, ? super V> partitioner,
955+
String... predecessorNames) // <1>
939956
void addSink(
940-
final String name,
941-
final TopicNameExtractor<K, V> topicExtractor,
942-
final Serializer<K> keySerializer,
943-
final Serializer<V> valSerializer,
944-
final StreamPartitioner<? super K, ? super V> partitioner,
945-
final String... predecessorNames)
957+
String name,
958+
TopicNameExtractor<K, V> topicExtractor,
959+
Serializer<K> keySerializer,
960+
Serializer<V> valSerializer,
961+
StreamPartitioner<? super K, ? super V> partitioner,
962+
String... predecessorNames)
946963
----
947964
<1> Uses <<kafka-streams-internals-StaticTopicNameExtractor.adoc#, StaticTopicNameExtractor>>
948965

@@ -963,14 +980,15 @@ void addSink(
963980
* `Topology` is requested to <<kafka-streams-Topology.adoc#addSink, add a sink>>
964981
====
965982

966-
=== [[addInternalTopic]] Registering Internal Topic Name -- `addInternalTopic` Method
983+
=== [[addInternalTopic]] Adding Internal Topic Name -- `addInternalTopic` Method
967984

968985
[source, java]
969986
----
970-
void addInternalTopic(final String topicName)
987+
void addInternalTopic(
988+
String topicName)
971989
----
972990

973-
`addInternalTopic` simply registers the input `topicName` (in the <<internalTopicNames, internalTopicNames>> internal registry).
991+
`addInternalTopic` simply adds the input `topicName` to the <<internalTopicNames, internalTopicNames>> internal registry.
974992

975993
[NOTE]
976994
====
@@ -986,10 +1004,12 @@ void addInternalTopic(final String topicName)
9861004
[source, java]
9871005
----
9881006
ProcessorTopology build() // <1>
989-
ProcessorTopology build(final Integer topicGroupId) // <2>
1007+
ProcessorTopology build(
1008+
Integer topicGroupId) // <2>
9901009
9911010
// PRIVATE
992-
private ProcessorTopology build(final Set<String> nodeGroup)
1011+
private ProcessorTopology build(
1012+
Set<String> nodeGroup)
9931013
----
9941014
<1> Uses <<build-topicGroupId, build>> with an undefined `topicGroupId` (i.e. `null`)
9951015
<2> Uses `build` with `nodeGroup` being the node names for a given `topicGroupId`
@@ -1018,13 +1038,14 @@ NOTE: `nodeGroup` can be either <<globalNodeGroups, global node groups>> (aka _g
10181038

10191039
NOTE: The private `build` is used when `InternalTopologyBuilder` is requested to <<build-topicGroupId, build a processor task topology>> (for a group ID) and <<buildGlobalStateTopology, build a global processor task topology>>.
10201040

1021-
NOTE: The parameter-less `build` is used exclusively when `KafkaStreams` is <<kafka-streams-KafkaStreams.adoc#, created>> (as a sanity check to fail-fast in case a `ProcessorTopology` could not be built due to some exception).
1041+
NOTE: The no-argument `build` is used exclusively when `KafkaStreams` is <<kafka-streams-KafkaStreams.adoc#, created>> (as a sanity check to fail-fast in case a `ProcessorTopology` could not be built due to some exception).
10221042

10231043
==== [[build-topicGroupId]] Building Processor Task Topology For Group ID -- `build` Factory Method
10241044

10251045
[source, java]
10261046
----
1027-
ProcessorTopology build(final Integer topicGroupId)
1047+
ProcessorTopology build(
1048+
Integer topicGroupId)
10281049
----
10291050

10301051
This variant of `build` takes either a group ID or `null` (see the parameter-less <<build, build()>>).
@@ -1037,7 +1058,7 @@ When the input `topicGroupId` is undefined (i.e. `null`), `build` takes the node
10371058
====
10381059
`build` (with a topic group ID) is used when:
10391060
1040-
* `InternalTopologyBuilder` is requested to <<build, build a processor task topology>> (with no group ID)
1061+
* `InternalTopologyBuilder` is requested to <<build, build a processor task topology>> (with no group ID specified)
10411062
10421063
* `StandbyTaskCreator` is requested to <<kafka-streams-internals-StandbyTaskCreator.adoc#createTask, create a standby task for a given task ID>>
10431064
@@ -1060,8 +1081,8 @@ NOTE: `allStateStoreName` is used exclusively when `TopologyTestDriver` is reque
10601081
[source, java]
10611082
----
10621083
InternalTopicConfig createChangelogTopicConfig(
1063-
final StateStoreFactory factory,
1064-
final String name)
1084+
StateStoreFactory factory,
1085+
String name)
10651086
----
10661087

10671088
`createChangelogTopicConfig` creates a <<kafka-streams-internals-UnwindowedChangelogTopicConfig.adoc#, UnwindowedChangelogTopicConfig>> or a <<kafka-streams-internals-WindowedChangelogTopicConfig.adoc#, WindowedChangelogTopicConfig>> per the <<kafka-streams-internals-InternalTopologyBuilder-StateStoreFactory.adoc#isWindowStore, isWindowStore>> flag of the input `StateStoreFactory`.
@@ -1107,13 +1128,13 @@ Before returning the <<topicPattern, source topics pattern>>, `sourceTopicPatter
11071128
[source, java]
11081129
----
11091130
Pattern buildPatternForOffsetResetTopics(
1110-
final Collection<String> sourceTopics,
1111-
final Collection<Pattern> sourcePatterns)
1131+
Collection<String> sourceTopics,
1132+
Collection<Pattern> sourcePatterns)
11121133
----
11131134

11141135
`buildPatternForOffsetResetTopics`...FIXME
11151136

1116-
NOTE: `buildPatternForOffsetResetTopics` is used when...FIXME
1137+
NOTE: `buildPatternForOffsetResetTopics` is used when `InternalTopologyBuilder` is requested to <<resetTopicsPattern, resetTopicsPattern>> and <<sourceTopicPattern, sourceTopicPattern>>.
11171138

11181139
=== [[setRegexMatchedTopicsToSourceNodes]] `setRegexMatchedTopicsToSourceNodes` Internal Method
11191140

0 commit comments

Comments
 (0)
Please sign in to comment.