Skip to content

Commit 2f8f27c

Browse files
Transformer -- Record-by-Record Stateful Transformation
1 parent 238d6f3 commit 2f8f27c

5 files changed

+126
-24
lines changed

SUMMARY.adoc

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
.. link:kafka-streams-StateRestoreCallback.adoc[StateRestoreCallback]
3737
.. link:kafka-streams-To.adoc[To]
3838

39-
. link:kafka-streams-Transformer.adoc[Transformer]
39+
. link:kafka-streams-Transformer.adoc[Transformer -- Record-by-Record Stateful Transformation]
40+
.. link:kafka-streams-TransformerSupplier.adoc[TransformerSupplier]
4041

4142
. link:kafka-streams-ValueTransformer.adoc[ValueTransformer -- Stateful Record-by-Record Value Transformation]
4243
.. link:kafka-streams-ValueTransformerSupplier.adoc[ValueTransformerSupplier -- ValueTransformers Object Factory]
@@ -136,6 +137,7 @@
136137
. link:kafka-streams-Processor.adoc[Processor Contract -- Stream Processing Nodes]
137138
.. link:kafka-streams-AbstractProcessor.adoc[AbstractProcessor -- Base for Stream Processors]
138139
.. link:kafka-streams-KStreamSessionWindowAggregateProcessor.adoc[KStreamSessionWindowAggregateProcessor]
140+
.. link:kafka-streams-KStreamTransformProcessor.adoc[KStreamTransformProcessor]
139141
.. link:kafka-streams-KStreamTransformValuesProcessor.adoc[KStreamTransformValuesProcessor]
140142

141143
. link:kafka-streams-ProcessorNodePunctuator.adoc[ProcessorNodePunctuator]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
== [[KStreamTransformProcessor]] KStreamTransformProcessor
2+
3+
`KStreamTransformProcessor` is...FIXME
4+
5+
=== [[close]] Closing Processor -- `close` Method
6+
7+
[source, java]
8+
----
9+
void close()
10+
----
11+
12+
NOTE: `close` is part of the <<kafka-streams-Processor.adoc#close, Processor Contract>> to close the <<kafka-streams-Processor.adoc#, processor>>.
13+
14+
`close`...FIXME
15+
16+
=== [[init]] Initializing Processor -- `init` Method
17+
18+
[source, java]
19+
----
20+
void init(ProcessorContext context)
21+
----
22+
23+
NOTE: `init` is part of the <<kafka-streams-Processor.adoc#init, Processor Contract>> to initialize the <<kafka-streams-Processor.adoc#, processor>>.
24+
25+
`init`...FIXME
26+
27+
=== [[process]] Processing Record -- `process` Method
28+
29+
[source, java]
30+
----
31+
void process(K1 key, V1 value)
32+
----
33+
34+
NOTE: `process` is part of the <<kafka-streams-Processor.adoc#process, Processor Contract>> to process a single record.
35+
36+
`process`...FIXME

kafka-streams-Processor.adoc

+26-21
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,41 @@
55
`Processor` is an integral part of link:kafka-streams-ProcessorNode.adoc#processor[ProcessorNode].
66

77
[[contract]]
8+
.Processor Contract
9+
[cols="1m,2",options="header",width="100%"]
10+
|===
11+
| Method
12+
| Description
13+
14+
| close
15+
a| [[close]]
16+
817
[source, java]
918
----
10-
package org.apache.kafka.streams.processor;
11-
12-
interface Processor<K, V> {
13-
void close();
14-
void init(ProcessorContext context);
15-
void process(K key, V value);
16-
}
19+
void close()
1720
----
1821

19-
NOTE: `Processor` is an `Evolving` contract which means that compatibility may be broken at a minor release.
22+
Closes the processor
2023

21-
.Processor Contract
22-
[cols="1,2",options="header",width="100%"]
23-
|===
24-
| Method
25-
| Description
24+
| init
25+
a| [[init]]
2626

27-
| `close`
28-
| [[close]]
27+
[source, java]
28+
----
29+
void init(ProcessorContext context)
30+
----
2931

30-
| `init`
31-
| [[init]] Initializing a stream processor
32+
Initializes the processor
3233

33-
| `process`
34-
| [[process]] Processing a single record
34+
| process
35+
a| [[process]]
36+
37+
[source, java]
38+
----
39+
void process(K key, V value)
40+
----
3541

36-
| `punctuate`
37-
| [[punctuate]] *DEPRECATED*
42+
Processes a single record
3843
|===
3944

4045
The lifecycle of a `Processor` starts at <<init, init>> when....FIXME

kafka-streams-Transformer.adoc

+58-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,59 @@
1-
== [[Transformer]] Transformer
1+
== [[Transformer]] Transformer -- Record-by-Record Stateful Transformation
22

3-
`Transformer` is...FIXME
3+
`Transformer` is the <<contract, contract>> that allows for a <<transform, stateful transformation>> of every record in the input stream into zero or more output records (both key and value type can be altered arbitrarily).
4+
5+
In order to use a state, a state store has to be registered beforehand (using `addStateStore` or `addGlobalStore`) before they can be connected to the `Transformer`.
6+
7+
You can access a state store using the <<kafka-streams-ProcessorContext.adoc#, ProcessorContext>> that is given when a transformer is <<init, initialized>>.
8+
9+
[source, scala]
10+
----
11+
// FIXME: Demo
12+
----
13+
14+
`Transformer` is created (_supplied_) using a corresponding <<kafka-streams-TransformerSupplier.adoc#, TransformerSupplier>>.
15+
16+
[[contract]]
17+
[cols="1m,2",options="header",width="100%"]
18+
|===
19+
| Property
20+
| Description
21+
22+
| close
23+
a| [[close]]
24+
25+
[source, java]
26+
----
27+
void close()
28+
----
29+
30+
Closes the transformer
31+
32+
Used exclusively when `KStreamTransformProcessor` is requested to <<kafka-streams-KStreamTransformProcessor.adoc#close, close>>
33+
34+
| init
35+
a| [[init]]
36+
37+
[source, java]
38+
----
39+
void init(final ProcessorContext context)
40+
----
41+
42+
Initializes the transformer in the given <<kafka-streams-ProcessorContext.adoc#, ProcessorContext>>
43+
44+
Used exclusively when `KStreamTransformProcessor` is requested to <<kafka-streams-KStreamTransformProcessor.adoc#init, initialize itself>>
45+
46+
| transform
47+
a| [[transform]]
48+
49+
[source, java]
50+
----
51+
R transform(final K key, final V value)
52+
----
53+
54+
Transforms a single record (into a `KeyValue` object)
55+
56+
Used exclusively when `KStreamTransformProcessor` is requested to <<kafka-streams-KStreamTransformProcessor.adoc#process, process a record>>
57+
|===
58+
59+
`Transformer` is used to create a <<kafka-streams-KStreamTransformProcessor.adoc#, KStreamTransformProcessor>>.
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[TransformerSupplier]] TransformerSupplier
2+
3+
`TransformerSupplier` is...FIXME

0 commit comments

Comments
 (0)