Skip to content

Commit f83565a

Browse files
AbstractStream
1 parent db65d40 commit f83565a

6 files changed

+70
-84
lines changed

kafka-streams-ValueTransformerSupplier.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ interface ValueTransformerSupplier<V, VR> {
2020
FIXME Example of KStream.transformValues
2121
----
2222

23-
`get` is used exclusively when `AbstractStream` is requested for link:kafka-streams-internals-AbstractStream.adoc#toInternalValueTransformerSupplier[toInternalValueTransformerSupplier]
23+
`get`...FIXME
+54-69
Original file line numberDiff line numberDiff line change
@@ -1,124 +1,109 @@
11
== [[AbstractStream]] AbstractStream
22

3-
`AbstractStream` is a base abstraction for the <<implementations, different concrete streams>>.
4-
5-
[[creating-instance]]
6-
Every `AbstractStream` has the following internal properties:
7-
8-
* [[builder]] link:kafka-streams-internals-InternalStreamsBuilder.adoc[InternalStreamsBuilder]
9-
* [[name]] Name
10-
* [[sourceNodes]] One or more names of the source nodes
3+
`AbstractStream` is the base abstraction of the <<implementations, streams>>.
114

125
[[implementations]]
136
.AbstractStreams
14-
[cols="1,2",options="header",width="100%"]
7+
[cols="30,70",options="header",width="100%"]
158
|===
169
| AbstractStream
1710
| Description
1811

19-
| [[KTableImpl]] link:kafka-streams-internals-KTableImpl.adoc[KTableImpl]
20-
|
12+
| <<kafka-streams-internals-KGroupedStreamImpl.adoc#, KGroupedStreamImpl>>
13+
| [[KGroupedStreamImpl]]
2114

22-
| [[KGroupedTableImpl]] link:kafka-streams-internals-KGroupedTableImpl.adoc[KGroupedTableImpl]
23-
|
15+
| <<kafka-streams-internals-KGroupedTableImpl.adoc#, KGroupedTableImpl>>
16+
| [[KGroupedTableImpl]]
2417

25-
| [[TimeWindowedKStreamImpl]] link:kafka-streams-internals-TimeWindowedKStreamImpl.adoc[TimeWindowedKStreamImpl]
26-
|
18+
| <<kafka-streams-internals-KStreamImpl.adoc#, KStreamImpl>>
19+
| [[KStreamImpl]]
2720

28-
| [[SessionWindowedKStreamImpl]] link:kafka-streams-internals-SessionWindowedKStreamImpl.adoc[SessionWindowedKStreamImpl]
29-
|
21+
| <<kafka-streams-internals-KTableImpl.adoc#, KTableImpl>>
22+
| [[KTableImpl]]
3023

31-
| [[KStreamImpl]] link:kafka-streams-internals-KStreamImpl.adoc[KStreamImpl]
32-
|
24+
| <<kafka-streams-internals-SessionWindowedKStreamImpl.adoc#, SessionWindowedKStreamImpl>>
25+
| [[SessionWindowedKStreamImpl]]
26+
27+
| <<kafka-streams-internals-TimeWindowedKStreamImpl.adoc#, TimeWindowedKStreamImpl>>
28+
| [[TimeWindowedKStreamImpl]]
3329

34-
| [[KGroupedStreamImpl]] link:kafka-streams-internals-KGroupedStreamImpl.adoc[KGroupedStreamImpl]
35-
|
3630
|===
3731

38-
=== [[storeFactory]] `storeFactory` Static Method
32+
[[creating-instance]]
33+
Every `AbstractStream` has the following properties:
34+
35+
* [[name]] Name
36+
* [[keySerde]] Key Serde (`Serde<K>`)
37+
* [[valSerde]] Value Serde (`Serde<V>`)
38+
* [[sourceNodes]] Names of the source nodes (`Set<String>`)
39+
* [[streamsGraphNode]] <<kafka-streams-internals-StreamsGraphNode.adoc#, StreamsGraphNode>>
40+
* [[builder]] <<kafka-streams-internals-InternalStreamsBuilder.adoc#, InternalStreamsBuilder>>
41+
42+
=== [[ensureJoinableWith]] `ensureJoinableWith` Method
3943

4044
[source, java]
4145
----
42-
static <T, K> Stores.PersistentKeyValueFactory<K, T> storeFactory(
43-
final Serde<K> keySerde,
44-
final Serde<T> aggValueSerde,
45-
final String storeName)
46+
Set<String> ensureJoinableWith(
47+
AbstractStream<K, ?> other)
4648
----
4749

48-
`storeFactory` simply requests `Stores` to link:kafka-streams-Stores.adoc#create[create a StoreFactory] with the input parameters.
50+
`ensureJoinableWith`...FIXME
4951

5052
[NOTE]
5153
====
52-
`storeFactory` is used when:
54+
`ensureJoinableWith` is used when:
5355
54-
* `AbstractStream` is requested for <<keyValueStore, keyValueStore>> and <<windowedStore, windowedStore>>
56+
* `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#join, join>>, <<kafka-streams-internals-KStreamImpl.adoc#outerJoin, outerJoin>> and <<kafka-streams-internals-KStreamImpl.adoc#leftJoin, leftJoin>>
5557
56-
* `KGroupedStreamImpl` is requested for link:kafka-streams-internals-KGroupedStreamImpl.adoc#aggregate[aggregate] and link:kafka-streams-internals-KGroupedStreamImpl.adoc#reduce[reduce]
58+
* `KTableImpl` is requested to <<kafka-streams-internals-KTableImpl.adoc#join, join>>, <<kafka-streams-internals-KTableImpl.adoc#leftJoin, leftJoin>> and <<kafka-streams-internals-KTableImpl.adoc#outerJoin, outerJoin>>
5759
====
5860

59-
=== [[keyValueStore]] `keyValueStore` Static Method
61+
=== [[toValueTransformerWithKeySupplier]] `toValueTransformerWithKeySupplier` Static Method
6062

6163
[source, java]
6264
----
63-
static <T, K> org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore>
64-
keyValueStore(
65-
final Serde<K> keySerde,
66-
final Serde<T> aggValueSerde,
67-
final String storeName)
65+
ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(
66+
ValueTransformerSupplier<V, VR> valueTransformerSupplier)
6867
----
6968

70-
`keyValueStore`...FIXME
69+
`toValueTransformerWithKeySupplier`...FIXME
7170

72-
NOTE: `keyValueStore` is used when...FIXME
71+
NOTE: `toValueTransformerWithKeySupplier` is used when `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#transformValues, transformValues>> and <<kafka-streams-internals-KStreamImpl.adoc#flatTransformValues, flatTransformValues>>.
7372

74-
=== [[windowedStore]] `windowedStore` Static Method
73+
=== [[reverseJoiner]] `reverseJoiner` Static Method
7574

7675
[source, java]
7776
----
78-
static <W extends Window, T, K> org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore>
79-
windowedStore(
80-
final Serde<K> keySerde,
81-
final Serde<T> aggValSerde,
82-
final Windows<W> windows,
83-
final String storeName)
77+
ValueJoiner<T2, T1, R> reverseJoiner(
78+
ValueJoiner<T1, T2, R> joiner)
8479
----
8580

86-
`windowedStore`...FIXME
87-
88-
NOTE: `windowedStore` is used when...FIXME
81+
`reverseJoiner`...FIXME
8982

90-
=== [[toInternalValueTransformerSupplier]] Converting ValueTransformerSupplier or ValueTransformerWithKeySupplier to InternalValueTransformerWithKeySupplier -- `toInternalValueTransformerSupplier` Method
91-
92-
[source, java]
93-
----
94-
static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR>
95-
toInternalValueTransformerSupplier(final ValueTransformerSupplier<V, VR> valueTransformerSupplier)
96-
static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR>
97-
toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier)
98-
----
99-
100-
`toInternalValueTransformerSupplier` requests a `ValueTransformer` or `ValueTransformerWithKey` from the link:kafka-streams-ValueTransformerSupplier.adoc#get[ValueTransformerSupplier] or `ValueTransformerWithKeySupplier`, respectively.
101-
102-
`toInternalValueTransformerSupplier` then creates a `InternalValueTransformerWithKeySupplier` that can produce a `InternalValueTransformerWithKey` that passes all calls to the `ValueTransformerSupplier` or `ValueTransformerWithKeySupplier`, respectively.
83+
[NOTE]
84+
====
85+
`reverseJoiner` is used when:
10386
104-
`toInternalValueTransformerSupplier` reports an `NullPointerException` when `ValueTransformerSupplier` or `ValueTransformerWithKeySupplier` are `null`.
87+
* `KStreamImplJoin` is requested to <<kafka-streams-internals-KStreamImpl-KStreamImplJoin.adoc#join, join>> (for <<kafka-streams-internals-KStreamImpl.adoc#join, KStreamImpl.join>>, <<kafka-streams-internals-KStreamImpl.adoc#outerJoin, KStreamImpl.outerJoin>> and <<kafka-streams-internals-KStreamImpl.adoc#leftJoin, KStreamImpl.leftJoin>> operators)
10588
106-
NOTE: `toInternalValueTransformerSupplier` is used exclusively when `KStreamImpl` is requested for link:kafka-streams-internals-KStreamImpl.adoc#transformValues[transforming values with state].
89+
* `KTableImpl` is requested to <<kafka-streams-internals-KTableImpl.adoc#doJoin, doJoin>> (for <<kafka-streams-internals-KTableImpl.adoc#join, KTableImpl.join>>, <<kafka-streams-internals-KTableImpl.adoc#leftJoin, KTableImpl.leftJoin>> and <<kafka-streams-internals-KTableImpl.adoc#outerJoin, KTableImpl.outerJoin>> operators)
90+
====
10791

108-
=== [[ensureJoinableWith]] `ensureJoinableWith` Method
92+
=== [[withKey]] `withKey` Static Method
10993

11094
[source, java]
11195
----
112-
Set<String> ensureJoinableWith(final AbstractStream<K> other)
96+
ValueMapperWithKey<K, V, VR> withKey(
97+
ValueMapper<V, VR> valueMapper)
11398
----
11499

115-
`ensureJoinableWith`...FIXME
100+
`withKey`...FIXME
116101

117102
[NOTE]
118103
====
119-
`ensureJoinableWith` is used when:
104+
`withKey` is used when:
120105
121-
* `KStreamImpl` is requested to link:kafka-streams-internals-KStreamImpl.adoc#doJoin[doJoin] and link:kafka-streams-internals-KStreamImpl.adoc#doStreamTableJoin[doStreamTableJoin]
106+
* `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#mapValues, mapValues>> and <<kafka-streams-internals-KStreamImpl.adoc#flatMapValues, flatMapValues>>
122107
123-
* `KTableImpl` is requested to link:kafka-streams-internals-KTableImpl.adoc#buildJoin[buildJoin]
108+
* `KTableImpl` is requested to <<kafka-streams-internals-KTableImpl.adoc#mapValues, mapValues>>
124109
====

kafka-streams-internals-KStreamImpl-KStreamImplJoin.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ KStream<K1, R> join(
1616

1717
`join`...FIXME
1818

19-
NOTE: `join` is used exclusively when `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#doJoin, doJoin>> (for <<kafka-streams-internals-KStreamImpl.adoc#join, join>>, <<kafka-streams-internals-KStreamImpl.adoc#outerJoin, outerJoin>> and <<kafka-streams-internals-KStreamImpl.adoc#leftJoin, leftJoin>> join operators).
19+
NOTE: `join` is used exclusively when `KStreamImpl` is requested to <<kafka-streams-internals-KStreamImpl.adoc#doJoin, doJoin>> (for <<kafka-streams-internals-KStreamImpl.adoc#join, KStreamImpl.join>>, <<kafka-streams-internals-KStreamImpl.adoc#outerJoin, KStreamImpl.outerJoin>> and <<kafka-streams-internals-KStreamImpl.adoc#leftJoin, KStreamImpl.leftJoin>> operators).

kafka-streams-internals-KStreamImpl.adoc

+11-10
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@
2828

2929
[source, java]
3030
----
31-
KStream<K, R> doStreamTableJoin(
32-
KTable<K, V1> other,
33-
ValueJoiner<? super V, ? super V1, ? extends R> joiner,
31+
KStream<K, VR> doStreamTableJoin(
32+
KTable<K, VO> other,
33+
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
34+
Joined<K, V, VO> joined,
3435
boolean leftJoin)
3536
----
3637

@@ -42,12 +43,12 @@ NOTE: `doStreamTableJoin` is used when `KStreamImpl` is requested to <<join, joi
4243

4344
[source, java]
4445
----
45-
<V1, R> KStream<K, R> doJoin(
46-
final KStream<K, V1> other,
47-
final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
48-
final JoinWindows windows,
49-
final Joined<K, V, V1> joined,
50-
final KStreamImplJoin join)
46+
KStream<K, VR> doJoin(
47+
KStream<K, VO> other,
48+
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
49+
JoinWindows windows,
50+
Joined<K, V, VO> joined,
51+
KStreamImplJoin join)
5152
----
5253

5354
`doJoin`...FIXME
@@ -68,7 +69,7 @@ KStream<K, VR> transformValues(
6869

6970
NOTE: `transformValues` is part of link:kafka-streams-KStream.adoc#transformValues[KStream Contract] for a stateful record-by-record value transformation.
7071

71-
`transformValues` link:kafka-streams-internals-AbstractStream.adoc#toInternalValueTransformerSupplier[converts ValueTransformerSupplier or ValueTransformerWithKeySupplier to InternalValueTransformerWithKeySupplier] followed by the internal <<transformValues-private, transformValues>>.
72+
`transformValues`...FIXME
7273

7374
`transformValues` reports a `NullPointerException` when either `ValueTransformerSupplier` or `ValueTransformerWithKeySupplier` is `null`.
7475

kafka-streams-internals-KTableImpl.adoc

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,10 @@ NOTE: `buildJoin` is used exclusively when `KTableImpl` is requested to <<doJoin
151151

152152
[source, java]
153153
----
154-
<VO, VR> KTable<K, VR> doJoin(
154+
KTable<K, VR> doJoin(
155155
KTable<K, VO> other,
156156
ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
157-
MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
157+
MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal,
158158
boolean leftOuter,
159159
boolean rightOuter)
160160
----

kafka-streams-internals-TimeWindowedKStreamImpl.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
== [[TimeWindowedKStreamImpl]] TimeWindowedKStreamImpl
22

3-
`TimeWindowedKStreamImpl` is a <<kafka-streams-internals-AbstractStream.adoc#, stream>> that allows for <<kafka-streams-TimeWindowedKStream.adoc#, time-windowed streaming aggregations>>.
3+
`TimeWindowedKStreamImpl` is a <<kafka-streams-internals-AbstractStream.adoc#, stream>> for <<kafka-streams-TimeWindowedKStream.adoc#, time-windowed streaming aggregations>>.
44

55
`TimeWindowedKStreamImpl` is <<creating-instance, created>> exclusively when `KGroupedStreamImpl` is requested to <<kafka-streams-internals-KGroupedStreamImpl.adoc#windowedBy-Windows, windowedBy (with a Windows)>>.
66

0 commit comments

Comments
 (0)