Skip to content

Commit 6bd592a

Browse files
KStreamImpl.doTransformValues
1 parent ee6fb20 commit 6bd592a

6 files changed

+61
-12
lines changed

SUMMARY.adoc

+2
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@
161161
.. link:kafka-streams-internals-TableSourceNode.adoc[TableSourceNode]
162162
.. link:kafka-streams-internals-TableProcessorNode.adoc[TableProcessorNode]
163163
.. link:kafka-streams-internals-ProcessorGraphNode.adoc[ProcessorGraphNode]
164+
.. link:kafka-streams-internals-StatefulProcessorNode.adoc[StatefulProcessorNode]
164165

165166
.. link:kafka-streams-internals-GroupedTableOperationRepartitionNode.adoc[GroupedTableOperationRepartitionNode]
166167
... link:kafka-streams-internals-GroupedTableOperationRepartitionNodeBuilder.adoc[GroupedTableOperationRepartitionNodeBuilder]
@@ -237,6 +238,7 @@
237238
.. link:kafka-streams-internals-ConsumedInternal.adoc[ConsumedInternal -- Internal Accessors to Consumed Metadata]
238239
.. link:kafka-streams-internals-ProducedInternal.adoc[ProducedInternal -- Internal Accessors to Produced Metadata]
239240
.. link:kafka-streams-internals-QuickUnion.adoc[QuickUnion]
241+
.. link:kafka-streams-internals-TopicsInfo.adoc[TopicsInfo]
240242

241243
. link:kafka-streams-NodeFactory.adoc[NodeFactory]
242244
.. link:kafka-streams-ProcessorNodeFactory.adoc[ProcessorNodeFactory]

kafka-streams-internals-InternalTopologyBuilder.adoc

+14-12
Original file line numberDiff line numberDiff line change
@@ -487,24 +487,15 @@ void connectSourceStoreAndTopic(
487487
final String topic)
488488
----
489489

490-
`connectSourceStoreAndTopic` registers the `sourceStoreName` with the `topic` in <<storeToChangelogTopic, storeToChangelogTopic>>.
490+
`connectSourceStoreAndTopic` adds the given `sourceStoreName` with the `topic` to <<storeToChangelogTopic, storeToChangelogTopic>> internal registry.
491491

492-
`connectSourceStoreAndTopic` reports a `TopologyException` when <<storeToChangelogTopic, storeToChangelogTopic>> has `sourceStoreName` already been registered.
492+
`connectSourceStoreAndTopic` reports a `TopologyException` when <<storeToChangelogTopic, storeToChangelogTopic>> has `sourceStoreName` already registered.
493493

494494
```
495495
Source store [sourceStoreName] is already added.
496496
```
497497

498-
[NOTE]
499-
====
500-
`connectSourceStoreAndTopic` is used when:
501-
502-
* `InternalStreamsBuilder` is requested to link:kafka-streams-internals-InternalStreamsBuilder.adoc#table[create a KTable for a topic]
503-
504-
* `InternalTopologyBuilder` is requested to <<addGlobalStore, add a global state store to a topology>>
505-
506-
* *(deprecated)* `TopologyBuilder` is requested to `connectSourceStoreAndTopic`
507-
====
498+
NOTE: `connectSourceStoreAndTopic` is used when `InternalTopologyBuilder` is requested to <<addGlobalStore, add a global state store to a topology>> and <<adjust, adjust>>.
508499

509500
=== [[connectProcessorAndStateStore]] Connecting State Store with Processor Node -- `connectProcessorAndStateStore` Internal Method
510501

@@ -1027,3 +1018,14 @@ InternalTopologyBuilder rewriteTopology(final StreamsConfig config)
10271018
`rewriteTopology`...FIXME
10281019

10291020
NOTE: `rewriteTopology` is used exclusively when `KafkaStreams` is <<kafka-streams-KafkaStreams.adoc#creating-instance, created>>.
1021+
1022+
=== [[adjust]] `adjust` Internal Method
1023+
1024+
[source, java]
1025+
----
1026+
void adjust(final StreamsConfig config)
1027+
----
1028+
1029+
`adjust`...FIXME
1030+
1031+
NOTE: `adjust` is used exclusively when `InternalTopologyBuilder` is requested to <<rewriteTopology, rewriteTopology>>.

kafka-streams-internals-KStreamImpl.adoc

+27
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@
1010
[[SINK_NAME]]
1111
`KStreamImpl` uses *KSTREAM-SINK-* as the prefix for...FIXME
1212

13+
[[TRANSFORMVALUES_NAME]]
14+
`KStreamImpl` uses *KSTREAM-TRANSFORMVALUES-* as the prefix for...FIXME
15+
1316
[[internal-registries]]
1417
.KStreamImpl's Internal Properties (e.g. Registries, Counters and Flags)
1518
[cols="1,2",options="header",width="100%"]
@@ -327,3 +330,27 @@ KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate)
327330
NOTE: `filterNot` is part of the <<kafka-streams-KStream.adoc#filterNot, KStream Contract>> to...FIXME.
328331

329332
`filterNot`...FIXME
333+
334+
=== [[doTransformValues]] `doTransformValues` Internal Method
335+
336+
[source, java]
337+
----
338+
KStream<K, VR> doTransformValues(
339+
final ValueTransformerWithKeySupplier<
340+
? super K,
341+
? super V,
342+
? extends VR> valueTransformerWithKeySupplier,
343+
final String... stateStoreNames)
344+
----
345+
346+
`doTransformValues` requests the <<builder, InternalStreamsBuilder>> for a <<kafka-streams-internals-InternalStreamsBuilder.adoc#newProcessorName, new processor name>> with <<TRANSFORMVALUES_NAME, KSTREAM-TRANSFORMVALUES>> prefix.
347+
348+
`doTransformValues` creates a new <<kafka-streams-internals-StatefulProcessorNode.adoc#, StatefulProcessorNode>> with the new processor name, the given `stateStoreNames` and the <<repartitionRequired, repartitionRequired>> flag.
349+
350+
`doTransformValues` requests the `StatefulProcessorNode` to <<kafka-streams-internals-StreamsGraphNode.adoc#setValueChangingOperation, setValueChangingOperation>>.
351+
352+
`doTransformValues` requests the <<builder, InternalStreamsBuilder>> to <<kafka-streams-internals-InternalStreamsBuilder.adoc#addGraphNode, add>> the `StatefulProcessorNode` (with the <<kafka-streams-AbstractStream.adoc#streamsGraphNode, StreamsGraphNode>> as the parent).
353+
354+
In the end, `doTransformValues` creates a new <<creating-instance, KStreamImpl>> (with the new processor name, the <<sourceNodes, sourceNodes>>, the <<repartitionRequired, repartitionRequired>> flag, the `StatefulProcessorNode` itself and the <<builder, InternalStreamsBuilder>>).
355+
356+
NOTE: `doTransformValues` is used when `KStreamImpl` is requested to <<transformValues, transformValues>>.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[StatefulProcessorNode]] StatefulProcessorNode
2+
3+
`StatefulProcessorNode` is...FIXME

kafka-streams-internals-StreamsGraphNode.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ void writeToTopology(final InternalTopologyBuilder topologyBuilder)
1919
2020
NOTE: `StreamsGraphNode` is a Java abstract class and cannot be <<creating-instance, created>> directly. It is created indirectly for the <<implementations, concrete StreamsGraphNodes>> and as the <<kafka-streams-internals-InternalStreamsBuilder.adoc#root, root node of an InternalStreamsBuilder>>.
2121

22+
[[valueChangingOperation]]
23+
[[setValueChangingOperation]]
24+
`StreamsGraphNode` uses the `valueChangingOperation` flag for...FIXME
25+
2226
[[implementations]]
2327
.StreamsGraphNodes (Direct Implementations)
2428
[cols="1,2",options="header",width="100%"]
+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
== [[TopicsInfo]] TopicsInfo
2+
3+
[[creating-instance]]
4+
`TopicsInfo` is a simple "container" with the following:
5+
6+
* [[sinkTopics]] Names of the sink topics
7+
* [[sourceTopics]] Names of the source topics
8+
* [[repartitionSourceTopics]] `Map<String, InternalTopicConfig>`
9+
* [[stateChangelogTopics]] `Map<String, InternalTopicConfig>`
10+
11+
`TopicsInfo` is <<creating-instance, created>> exclusively when `InternalTopologyBuilder` is requested for the <<kafka-streams-internals-InternalTopologyBuilder.adoc#topicGroups, topic groups>> (when a node group has at least one source topic, incl. repartition or state changelog topics).

0 commit comments

Comments
 (0)