Skip to content

Commit b09a1e4

Browse files
StatefulProcessorNode
1 parent 6ed9b74 commit b09a1e4

3 files changed

+103
-24
lines changed

Diff for: kafka-streams-internals-GroupedStreamAggregateBuilder.adoc

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
[source, java]
1818
----
1919
<T> KTable<K, T> build(
20-
final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
21-
final String functionName,
22-
final StoreBuilder storeBuilder,
23-
final boolean isQueryable)
20+
KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
21+
String functionName,
22+
StoreBuilder storeBuilder,
23+
boolean isQueryable)
2424
----
2525

2626
`build`...FIXME

Diff for: kafka-streams-internals-KStreamImpl.adoc

+79-7
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ NOTE: `transformValues` is part of link:kafka-streams-KStream.adoc#transformValu
7777
[source, java]
7878
----
7979
void process(
80-
final ProcessorSupplier<? super K, ? super V> processorSupplier,
81-
final String... stateStoreNames)
80+
ProcessorSupplier<? super K, ? super V> processorSupplier,
81+
String... stateStoreNames)
8282
----
8383

8484
NOTE: `process` is part of link:kafka-streams-KStream.adoc#process[KStream Contract] for...FIXME
@@ -90,8 +90,11 @@ NOTE: `process` is part of link:kafka-streams-KStream.adoc#process[KStream Contr
9090
[source, java]
9191
----
9292
KStream<K1, V1> transform(
93-
final TransformerSupplier<? super K, ? super V, KeyValue<K1, V1>> transformerSupplier,
94-
final String... stateStoreNames)
93+
TransformerSupplier<
94+
? super K,
95+
? super V,
96+
KeyValue<K1, V1>> transformerSupplier,
97+
String... stateStoreNames)
9598
----
9699

97100
NOTE: `transform` is part of <<kafka-streams-KStream.adoc#transform, KStream API>> to transform a record into zero or more output records with state.
@@ -331,16 +334,53 @@ NOTE: `filterNot` is part of the <<kafka-streams-KStream.adoc#filterNot, KStream
331334

332335
`filterNot`...FIXME
333336

337+
=== [[flatTransform]] `flatTransform` Method
338+
339+
[source, java]
340+
----
341+
KStream<K1, V1> flatTransform(
342+
TransformerSupplier<
343+
? super K,
344+
? super V,
345+
Iterable<KeyValue<K1, V1>>> transformerSupplier,
346+
String... stateStoreNames)
347+
----
348+
349+
NOTE: `flatTransform` is part of the <<kafka-streams-KStream.adoc#flatTransform, KStream Contract>> to...FIXME.
350+
351+
`flatTransform`...FIXME
352+
353+
=== [[flatTransformValues]] `flatTransformValues` Method
354+
355+
[source, java]
356+
----
357+
KStream<K, VR> flatTransformValues(
358+
ValueTransformerSupplier<
359+
? super V,
360+
Iterable<VR>> valueTransformerSupplier,
361+
String... stateStoreNames)
362+
KStream<K, VR> flatTransformValues(
363+
ValueTransformerWithKeySupplier<
364+
? super K,
365+
? super V,
366+
Iterable<VR>> valueTransformerSupplier,
367+
String... stateStoreNames)
368+
----
369+
370+
NOTE: `flatTransformValues` is part of the <<kafka-streams-KStream.adoc#flatTransformValues, KStream Contract>> to...FIXME.
371+
372+
`flatTransformValues`...FIXME
373+
334374
=== [[doTransformValues]] `doTransformValues` Internal Method
335375

336376
[source, java]
337377
----
338378
KStream<K, VR> doTransformValues(
339-
final ValueTransformerWithKeySupplier<
379+
ValueTransformerWithKeySupplier<
340380
? super K,
341381
? super V,
342382
? extends VR> valueTransformerWithKeySupplier,
343-
final String... stateStoreNames)
383+
String... stateStoreNames)
344384
----
345385

346386
`doTransformValues` requests the <<builder, InternalStreamsBuilder>> for a <<kafka-streams-internals-InternalStreamsBuilder.adoc#newProcessorName, new processor name>> with <<TRANSFORMVALUES_NAME, KSTREAM-TRANSFORMVALUES>> prefix.
@@ -353,7 +393,7 @@ KStream<K, VR> doTransformValues(
353393

354394
In the end, `doTransformValues` creates a new <<creating-instance, KStreamImpl>> (with the new processor name, the <<sourceNodes, sourceNodes>>, the <<repartitionRequired, repartitionRequired>> flag, the `StatefulProcessorNode` itself and the <<builder, InternalStreamsBuilder>>).
355395

356-
NOTE: `doTransformValues` is used when `KStreamImpl` is requested to <<transformValues, transformValues>>.
396+
NOTE: `doTransformValues` is used exclusively when `KStreamImpl` is requested to <<transformValues, transformValues>>.
357397

358398
=== [[internalSelectKey]] `internalSelectKey` Internal Method
359399

@@ -366,3 +406,35 @@ ProcessorGraphNode<K, V> internalSelectKey(
366406
`internalSelectKey`...FIXME
367407

368408
NOTE: `internalSelectKey` is used when `KStreamImpl` is requested to <<selectKey, selectKey>> and <<groupBy, groupBy>>.
409+
410+
=== [[doFlatTransform]] `doFlatTransform` Internal Method
411+
412+
[source, java]
413+
----
414+
KStream<K1, V1> doFlatTransform(
415+
TransformerSupplier<
416+
? super K,
417+
? super V,
418+
Iterable<KeyValue<K1, V1>>> transformerSupplier,
419+
String... stateStoreNames)
420+
----
421+
422+
`doFlatTransform`...FIXME
423+
424+
NOTE: `doFlatTransform` is used when `KStreamImpl` is requested to <<transform, transform>> and <<flatTransform, flatTransform>>.
425+
426+
=== [[doFlatTransformValues]] `doFlatTransformValues` Internal Method
427+
428+
[source, java]
429+
----
430+
KStream<K, VR> doFlatTransformValues(
431+
ValueTransformerWithKeySupplier<
432+
? super K,
433+
? super V,
434+
Iterable<VR>> valueTransformerWithKeySupplier,
435+
String... stateStoreNames)
436+
----
437+
438+
`doFlatTransformValues`...FIXME
439+
440+
NOTE: `doFlatTransformValues` is used exclusively when `KStreamImpl` is requested to <<flatTransformValues, flatTransformValues>>.

Diff for: kafka-streams-internals-StatefulProcessorNode.adoc

+20-13
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,48 @@
11
== [[StatefulProcessorNode]] StatefulProcessorNode
22

3-
`StatefulProcessorNode` is a <<kafka-streams-internals-StreamsGraphNode.adoc#, graph node>> for stateful operators in <<kafka-streams-internals-KGroupedStreamImpl.adoc#, KGroupedStreamImpl>>, <<kafka-streams-internals-KGroupedTableImpl.adoc#, KGroupedTableImpl>>, <<kafka-streams-internals-KStreamImpl.adoc#, KStreamImpl>>, and <<kafka-streams-internals-KTableImpl.adoc#, KTableImpl>>.
3+
`StatefulProcessorNode` is a concrete <<kafka-streams-internals-StreamsGraphNode.adoc#, StreamsGraphNode>> (as a <<kafka-streams-internals-ProcessorGraphNode.adoc#, ProcessorGraphNode>>) that represents stateful operators of the following:
44

5-
[[storeBuilder]]
6-
`StatefulProcessorNode` extends the parent <<kafka-streams-internals-ProcessorGraphNode.adoc#, ProcessorGraphNode>> with the two state-related parameters, i.e. <<storeNames, state store names>> and <<materializedKTableStoreBuilder, StoreBuilder>>.
5+
* <<kafka-streams-internals-KGroupedStreamImpl.adoc#, KGroupedStreamImpl>>
6+
7+
* <<kafka-streams-internals-KGroupedTableImpl.adoc#, KGroupedTableImpl>>
78
8-
When requested to <<writeToTopology, writeToTopology>>, `StatefulProcessorNode` simply requests the given <<kafka-streams-internals-InternalTopologyBuilder.adoc#, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addProcessor, add a processor>> (as a <<kafka-streams-internals-ProcessorGraphNode.adoc#, ProcessorGraphNode>>) and to associate the state stores (by the <<storeNames, names>> or the <<materializedKTableStoreBuilder, StoreBuilder>>).
9+
* <<kafka-streams-internals-KStreamImpl.adoc#, KStreamImpl>>
10+
11+
* <<kafka-streams-internals-KTableImpl.adoc#, KTableImpl>>
912
1013
`StatefulProcessorNode` is <<creating-instance, created>> when:
1114

12-
* `GroupedStreamAggregateBuilder` is requested to <<kafka-streams-internals-GroupedStreamAggregateBuilder.adoc#build, build>> (for <<kafka-streams-internals-KStreamImpl.adoc#groupBy, KStream.groupBy>> and <<kafka-streams-internals-KStreamImpl.adoc#groupByKey, KStream.groupByKey>> streaming operators, incl. `windowedBy` operator with a <<kafka-streams-internals-KGroupedStreamImpl.adoc#windowedBy-Windows, Windows>> or a <<kafka-streams-internals-KGroupedStreamImpl.adoc#windowedBy-SessionWindows, SessionWindows>>)
15+
* `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#transform, KStreamImpl.transform>> and <<kafka-streams-internals-KStreamImpl.adoc#flatTransform, KStreamImpl.flatTransform>>, <<kafka-streams-internals-KStreamImpl.adoc#transformValues, KStreamImpl.transformValues>>, <<kafka-streams-internals-KStreamImpl.adoc#flatTransformValues, KStreamImpl.flatTransformValues>>, <<kafka-streams-internals-KStreamImpl.adoc#process, KStreamImpl.process>>
1316
14-
* `KGroupedTableImpl` is requested to <<kafka-streams-internals-KGroupedTableImpl.adoc#doAggregate, doAggregate>> (for <<kafka-streams-internals-KGroupedTableImpl.adoc#reduce, reduce>>, <<kafka-streams-internals-KGroupedTableImpl.adoc#count, count>> and <<kafka-streams-internals-KGroupedTableImpl.adoc#aggregate, aggregate>> operators)
17+
* `GroupedStreamAggregateBuilder` is requested to <<kafka-streams-internals-GroupedStreamAggregateBuilder.adoc#build, build>> (for <<kafka-streams-internals-KStreamImpl.adoc#groupBy, KStream.groupBy>> and <<kafka-streams-internals-KStreamImpl.adoc#groupByKey, KStream.groupByKey>> streaming operators, incl. `windowedBy` operator with a <<kafka-streams-internals-KGroupedStreamImpl.adoc#windowedBy-Windows, Windows>> or a <<kafka-streams-internals-KGroupedStreamImpl.adoc#windowedBy-SessionWindows, SessionWindows>>)
1518
16-
* `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#transform, transform>>, <<kafka-streams-internals-KStreamImpl.adoc#doTransformValues, doTransformValues>> (for <<transformValues, transformValues>> operator), and <<kafka-streams-internals-KStreamImpl.adoc#process, process>>
19+
* `KGroupedTableImpl` is requested to <<kafka-streams-internals-KGroupedTableImpl.adoc#doAggregate, doAggregate>> (for <<kafka-streams-internals-KGroupedTableImpl.adoc#aggregate, KGroupedTableImpl.aggregate>>, <<kafka-streams-internals-KGroupedTableImpl.adoc#count, KGroupedTableImpl.count>>, and <<kafka-streams-internals-KGroupedTableImpl.adoc#reduce, KGroupedTableImpl.reduce>> operators)
1720
1821
* `KTableImpl` is requested to <<kafka-streams-internals-KTableImpl.adoc#suppress, suppress>>
1922
20-
[[creating-instance]]
23+
[[storeBuilder]]
24+
`StatefulProcessorNode` extends the parent <<kafka-streams-internals-ProcessorGraphNode.adoc#, ProcessorGraphNode>> with the two state-related parameters - the <<storeNames, state store names>> and a <<kafka-streams-StoreBuilder.adoc#, StoreBuilder>>.
25+
26+
=== [[creating-instance]] Creating StatefulProcessorNode Instance
27+
2128
`StatefulProcessorNode` takes the following to be created:
2229

2330
* [[nodeName]] Node name
2431
* [[processorParameters]] `ProcessorParameters<K, V>`
2532
* [[materializedKTableStoreBuilder]][[storeNames]] Names of the state stores or a <<kafka-streams-StoreBuilder.adoc#, StoreBuilder>>
26-
* [[repartitionRequired]] `repartitionRequired` flag
2733

2834
=== [[writeToTopology]] `writeToTopology` Method
2935

3036
[source, java]
3137
----
32-
void writeToTopology(final InternalTopologyBuilder topologyBuilder)
38+
void writeToTopology(
39+
InternalTopologyBuilder topologyBuilder)
3340
----
3441

3542
NOTE: `writeToTopology` is part of the <<kafka-streams-internals-StreamsGraphNode.adoc#writeToTopology, StreamsGraphNode Contract>> to...FIXME.
3643

37-
`writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addProcessor, add a processor>> with the name and the <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>> as defined by the <<processorParameters, ProcessorParameters>>.
44+
`writeToTopology` requests the given <<kafka-streams-internals-InternalTopologyBuilder.adoc#, InternalTopologyBuilder>> to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addProcessor, add a processor>> (with the name and the <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>> as defined by the <<processorParameters, ProcessorParameters>>).
3845

39-
With <<storeNames, state store name>> given, `writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#connectProcessorAndStateStores, connect them with the processor>>.
46+
With <<storeNames, state store names>> given, `writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#connectProcessorAndStateStores, connect them with the processor>>.
4047

41-
With a <<storeBuilder, StoreBuilder>> given, `writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addStateStore, addStateStore>> with the `StoreBuilder` and the processor.
48+
With a <<storeBuilder, StoreBuilder>> given, `writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addStateStore, add a state store>> (with the `StoreBuilder` and the processor).

0 commit comments

Comments
 (0)