Skip to content

Commit 261d596

Browse files
ProcessorGraphNode (and direct StreamsGraphNodes)
1 parent 6bd592a commit 261d596

8 files changed

+167
-51
lines changed

Diff for: SUMMARY.adoc

+17-16
Original file line numberDiff line numberDiff line change
@@ -153,22 +153,23 @@
153153

154154
=== Logical Streams Graph
155155

156-
. link:kafka-streams-internals-StreamsGraphNode.adoc[StreamsGraphNode]
157-
.. link:kafka-streams-internals-StreamSinkNode.adoc[StreamSinkNode]
158-
.. link:kafka-streams-internals-StreamSourceNode.adoc[StreamSourceNode]
159-
.. link:kafka-streams-internals-StateStoreNode.adoc[StateStoreNode]
160-
.. link:kafka-streams-internals-GlobalStoreNode.adoc[GlobalStoreNode]
161-
.. link:kafka-streams-internals-TableSourceNode.adoc[TableSourceNode]
162-
.. link:kafka-streams-internals-TableProcessorNode.adoc[TableProcessorNode]
163-
.. link:kafka-streams-internals-ProcessorGraphNode.adoc[ProcessorGraphNode]
164-
.. link:kafka-streams-internals-StatefulProcessorNode.adoc[StatefulProcessorNode]
165-
166-
.. link:kafka-streams-internals-GroupedTableOperationRepartitionNode.adoc[GroupedTableOperationRepartitionNode]
167-
... link:kafka-streams-internals-GroupedTableOperationRepartitionNodeBuilder.adoc[GroupedTableOperationRepartitionNodeBuilder]
168-
169-
.. link:kafka-streams-internals-OptimizableRepartitionNode.adoc[OptimizableRepartitionNode]
170-
171-
. link:kafka-streams-internals-BaseRepartitionNode.adoc[BaseRepartitionNode]
156+
. link:kafka-streams-internals-StreamsGraphNode.adoc[StreamsGraphNode Contract]
157+
.. link:kafka-streams-internals-BaseJoinProcessorNode.adoc[BaseJoinProcessorNode Contract]
158+
.. link:kafka-streams-internals-BaseRepartitionNode.adoc[BaseRepartitionNode Contract]
159+
160+
. link:kafka-streams-internals-GlobalStoreNode.adoc[GlobalStoreNode]
161+
. link:kafka-streams-internals-GroupedTableOperationRepartitionNode.adoc[GroupedTableOperationRepartitionNode]
162+
.. link:kafka-streams-internals-GroupedTableOperationRepartitionNodeBuilder.adoc[GroupedTableOperationRepartitionNodeBuilder]
163+
. link:kafka-streams-internals-OptimizableRepartitionNode.adoc[OptimizableRepartitionNode]
164+
. link:kafka-streams-internals-ProcessorGraphNode.adoc[ProcessorGraphNode]
165+
. link:kafka-streams-internals-StatefulProcessorNode.adoc[StatefulProcessorNode]
166+
. link:kafka-streams-internals-StateStoreNode.adoc[StateStoreNode]
167+
. link:kafka-streams-internals-StreamSinkNode.adoc[StreamSinkNode]
168+
. link:kafka-streams-internals-StreamSourceNode.adoc[StreamSourceNode]
169+
. link:kafka-streams-internals-StreamTableJoinNode.adoc[StreamTableJoinNode]
170+
. link:kafka-streams-internals-TableProcessorNode.adoc[TableProcessorNode]
171+
. link:kafka-streams-internals-TableSourceNode.adoc[TableSourceNode]
172+
172173

173174
=== Processors and ProcessorSuppliers
174175

Diff for: kafka-streams-KTable.adoc

+86-27
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,49 @@
44

55
[[operators]]
66
.KTable Operators
7-
[cols="1,2",options="header",width="100%"]
7+
[cols="1m,3",options="header",width="100%"]
88
|===
99
| Operator
1010
| Description
1111

12-
| [[groupBy]] `groupBy`
13-
a|
12+
| filter
13+
a| [[filter]]
14+
15+
[source, java]
16+
----
17+
KTable<K, V> filter(
18+
final Predicate<? super K, ? super V> predicate)
19+
KTable<K, V> filter(
20+
final Predicate<? super K, ? super V> predicate,
21+
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
22+
----
23+
24+
| filterNot
25+
a| [[filterNot]]
26+
27+
[source, java]
28+
----
29+
KTable<K, V> filterNot(
30+
final Predicate<? super K, ? super V> predicate)
31+
KTable<K, V> filterNot(
32+
final Predicate<? super K, ? super V> predicate,
33+
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
34+
----
35+
36+
| groupBy
37+
a| [[groupBy]]
1438

1539
[source, java]
1640
----
1741
KGroupedTable<KR, VR> groupBy(
18-
final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
42+
final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector)
1943
KGroupedTable<KR, VR> groupBy(
2044
final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
21-
final Serialized<KR, VR> serialized);
45+
final Grouped<KR, VR> grouped)
2246
----
2347

24-
| [[join]] `join`
25-
a|
48+
| join
49+
a| [[join]]
2650

2751
[source, java]
2852
----
@@ -35,8 +59,8 @@ KTable<K, VR> join(
3559
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
3660
----
3761

38-
| [[leftJoin]] `leftJoin`
39-
a|
62+
| leftJoin
63+
a| [[leftJoin]]
4064

4165
[source, java]
4266
----
@@ -49,8 +73,25 @@ KTable<K, VR> leftJoin(
4973
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
5074
----
5175

52-
| [[outerJoin]] `outerJoin`
53-
a|
76+
| mapValues
77+
a| [[mapValues]]
78+
79+
[source, java]
80+
----
81+
KTable<K, VR> mapValues(
82+
final ValueMapper<? super V, ? extends VR> mapper)
83+
KTable<K, VR> mapValues(
84+
final ValueMapper<? super V, ? extends VR> mapper,
85+
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
86+
KTable<K, VR> mapValues(
87+
final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper)
88+
KTable<K, VR> mapValues(
89+
final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
90+
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
91+
----
92+
93+
| outerJoin
94+
a| [[outerJoin]]
5495

5596
[source, java]
5697
----
@@ -63,29 +104,47 @@ KTable<K, VR> outerJoin(
63104
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
64105
----
65106

66-
| [[mapValues]] `mapValues`
67-
a| Stateful record-by-record value transformation
107+
| queryableStoreName
108+
a| [[queryableStoreName]]
68109

69110
[source, java]
70111
----
71-
KTable<K, VR> mapValues(
72-
final ValueMapper<? super V, ? extends VR> mapper)
73-
KTable<K, VR> mapValues(
74-
final ValueMapper<? super V, ? extends VR> mapper,
75-
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
76-
KTable<K, VR> mapValues(
77-
final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper)
78-
KTable<K, VR> mapValues(
79-
final ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper,
80-
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)
112+
String queryableStoreName()
81113
----
82114

83-
| [[toStream]] `toStream`
84-
a|
115+
| suppress
116+
a| [[suppress]]
85117

86118
[source, java]
87119
----
88-
KStream<K, V> toStream();
89-
KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
120+
KTable<K, V> suppress(
121+
final Suppressed<? super K> suppressed)
90122
----
123+
124+
| toStream
125+
a| [[toStream]]
126+
127+
[source, java]
128+
----
129+
KStream<K, V> toStream()
130+
KStream<KR, V> toStream(
131+
final KeyValueMapper<? super K, ? super V, ? extends KR> mapper)
132+
----
133+
134+
| transformValues
135+
a| [[transformValues]]
136+
137+
[source, java]
138+
----
139+
KTable<K, VR> transformValues(
140+
final ValueTransformerWithKeySupplier<
141+
? super K, ? super V, ? extends VR> transformerSupplier,
142+
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
143+
final String... stateStoreNames)
144+
KTable<K, VR> transformValues(
145+
final ValueTransformerWithKeySupplier<
146+
? super K, ? super V, ? extends VR> transformerSupplier,
147+
final String... stateStoreNames)
148+
----
149+
91150
|===

Diff for: kafka-streams-internals-BaseJoinProcessorNode.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[BaseJoinProcessorNode]] BaseJoinProcessorNode Contract
2+
3+
`BaseJoinProcessorNode` is...FIXME

Diff for: kafka-streams-internals-BaseRepartitionNode.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
== [[BaseRepartitionNode]] BaseRepartitionNode
1+
== [[BaseRepartitionNode]] BaseRepartitionNode Contract
22

33
`BaseRepartitionNode` is the base of <<kafka-streams-internals-StreamsGraphNode.adoc#, StreamsGraphNodes>> that...FIXME

Diff for: kafka-streams-internals-KTableImpl.adoc

+15-3
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ NOTE: `enableSendingOldValues` is used when...FIXME
181181

182182
=== [[filter]] `filter` Method
183183

184-
[source, scala]
184+
[source, java]
185185
----
186186
KTable<K, V> filter(
187187
final Predicate<? super K, ? super V> predicate)
@@ -196,7 +196,7 @@ NOTE: `filter` is part of the <<kafka-streams-KTable.adoc#filter, KTable Contrac
196196

197197
=== [[filterNot]] `filterNot` Method
198198

199-
[source, scala]
199+
[source, java]
200200
----
201201
KTable<K, V> filterNot(
202202
final Predicate<? super K, ? super V> predicate)
@@ -211,7 +211,7 @@ NOTE: `filterNot` is part of the <<kafka-streams-KTable.adoc#filterNot, KTable C
211211

212212
=== [[transformValues]] `transformValues` Method
213213

214-
[source, scala]
214+
[source, java]
215215
----
216216
KTable<K, VR> transformValues(
217217
final ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> transformerSupplier,
@@ -225,3 +225,15 @@ KTable<K, VR> transformValues(
225225
NOTE: `transformValues` is part of the <<kafka-streams-KTable.adoc#transformValues, KTable Contract>> to...FIXME.
226226

227227
`transformValues`...FIXME
228+
229+
=== [[suppress]] `suppress` Method
230+
231+
[source, java]
232+
----
233+
KTable<K, V> suppress(
234+
final Suppressed<? super K> suppressed)
235+
----
236+
237+
NOTE: `suppress` is part of the <<kafka-streams-KTable.adoc#suppress, KTable Contract>> to...FIXME.
238+
239+
`suppress`...FIXME

Diff for: kafka-streams-internals-StatefulProcessorNode.adoc

+39-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,41 @@
11
== [[StatefulProcessorNode]] StatefulProcessorNode
22

3-
`StatefulProcessorNode` is...FIXME
3+
`StatefulProcessorNode` is a <<kafka-streams-internals-ProcessorGraphNode.adoc#, ProcessorGraphNode>> with the two additional parameters:
4+
5+
* <<storeNames, Names of the state stores>>
6+
7+
* [[storeBuilder]] <<materializedKTableStoreBuilder, StoreBuilder>>
8+
9+
`StatefulProcessorNode` is <<creating-instance, created>> when:
10+
11+
* `GroupedStreamAggregateBuilder` is requested to <<kafka-streams-internals-GroupedStreamAggregateBuilder.adoc#build, build>>
12+
13+
* `KGroupedTableImpl` is requested to <<kafka-streams-internals-KGroupedTableImpl.adoc#doAggregate, doAggregate>>
14+
15+
* `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#transform, transform>>, <<kafka-streams-internals-KStreamImpl.adoc#doTransformValues, doTransformValues>>, and <<kafka-streams-internals-KStreamImpl.adoc#process, process>>
16+
17+
* `KTableImpl` is requested to <<kafka-streams-internals-KTableImpl.adoc#suppress, suppress>>
18+
19+
=== [[creating-instance]] Creating StatefulProcessorNode Instance
20+
21+
`StatefulProcessorNode` takes the following to be created:
22+
23+
* [[nodeName]] Node name
24+
* [[processorParameters]] `ProcessorParameters<K, V>`
25+
* [[materializedKTableStoreBuilder]][[storeNames]] Names of the state stores or a <<kafka-streams-StoreBuilder.adoc#, StoreBuilder>>
26+
* [[repartitionRequired]] `repartitionRequired` flag
27+
28+
=== [[writeToTopology]] `writeToTopology` Method
29+
30+
[source, java]
31+
----
32+
void writeToTopology(final InternalTopologyBuilder topologyBuilder)
33+
----
34+
35+
NOTE: `writeToTopology` is part of the <<kafka-streams-internals-StreamsGraphNode.adoc#writeToTopology, StreamsGraphNode Contract>> to...FIXME.
36+
37+
`writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addProcessor, addProcessor>> with the name and the <<kafka-streams-ProcessorSupplier.adoc#, ProcessorSupplier>> as defined by the <<processorParameters, ProcessorParameters>>.
38+
39+
If there were <<storeNames, state store names>> given, `writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#connectProcessorAndStateStores, connect them with the processor>>.
40+
41+
If there was a <<storeBuilder, StoreBuilder>> given, `writeToTopology` requests the given `InternalTopologyBuilder` to <<kafka-streams-internals-InternalTopologyBuilder.adoc#addStateStore, addStateStore>> with the `StoreBuilder` and the processor.

Diff for: kafka-streams-internals-StreamTableJoinNode.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[StreamTableJoinNode]] StreamTableJoinNode
2+
3+
`StreamTableJoinNode` is...FIXME

Diff for: kafka-streams-internals-StreamsGraphNode.adoc

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
== [[StreamsGraphNode]] StreamsGraphNode
1+
== [[StreamsGraphNode]] StreamsGraphNode Contract
22

33
`StreamsGraphNode` is the <<contract, abstraction>> of <<implementations, graph nodes>> that can <<writeToTopology, writeToTopology>>.
44

@@ -30,7 +30,7 @@ NOTE: `StreamsGraphNode` is a Java abstract class and cannot be <<creating-insta
3030
| StreamsGraphNode
3131
| Description
3232

33-
| BaseJoinProcessorNode
33+
| <<kafka-streams-internals-BaseJoinProcessorNode.adoc#, BaseJoinProcessorNode>>
3434
| [[BaseJoinProcessorNode]]
3535

3636
| <<kafka-streams-internals-BaseRepartitionNode.adoc#, BaseRepartitionNode>>
@@ -48,7 +48,7 @@ NOTE: `StreamsGraphNode` is a Java abstract class and cannot be <<creating-insta
4848
| <<kafka-streams-internals-StreamSourceNode.adoc#, StreamSourceNode>>
4949
| [[StreamSourceNode]]
5050

51-
| StreamTableJoinNode
51+
| <<kafka-streams-internals-StreamTableJoinNode.adoc#, StreamTableJoinNode>>
5252
| [[StreamTableJoinNode]]
5353

5454
| <<kafka-streams-internals-TableProcessorNode.adoc#, TableProcessorNode>>

0 commit comments

Comments
 (0)