Skip to content

Commit 943a79f

Browse files
Windows
1 parent 8ee6eb7 commit 943a79f

9 files changed

+59
-30
lines changed

SUMMARY.adoc

+6-2
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@
5252
. link:kafka-streams-TransformerSupplier.adoc[TransformerSupplier]
5353

5454
. link:kafka-streams-Windows.adoc[Windows -- Window Specification]
55+
.. link:kafka-streams-JoinWindows.adoc[JoinWindows -- Window Specification for Streaming Joins]
5556
.. link:kafka-streams-TimeWindows.adoc[TimeWindows -- Time-Bound Window Specification]
57+
.. link:kafka-streams-UnlimitedWindows.adoc[UnlimitedWindows]
5658

5759
. link:kafka-streams-Window.adoc[Window]
58-
.. link:kafka-streams-SessionWindow.adoc[SessionWindow]
59-
.. link:kafka-streams-TimeWindow.adoc[TimeWindow]
6060

6161
. link:kafka-streams-WindowedSerdes.adoc[WindowedSerdes -- SessionWindowedSerde and TimeWindowedSerde]
6262
.. link:kafka-streams-Windowed.adoc[Windowed]
@@ -250,6 +250,10 @@
250250

251251
. link:kafka-streams-internals-DefaultKafkaClientSupplier.adoc[DefaultKafkaClientSupplier]
252252

253+
. link:kafka-streams-internals-SessionWindow.adoc[SessionWindow]
254+
. link:kafka-streams-internals-TimeWindow.adoc[TimeWindow]
255+
. link:kafka-streams-internals-UnlimitedWindow.adoc[UnlimitedWindow]
256+
253257
=== Execution Environment
254258

255259
. link:kafka-streams-StreamsPartitionAssignor.adoc[StreamsPartitionAssignor -- Partition Assignment Strategy]

kafka-streams-JoinWindows.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[JoinWindows]] JoinWindows -- Window Specification for Streaming Joins
2+
3+
`JoinWindows` is a <<kafka-streams-Windows.adoc#, window specification>> that is used for streaming joins.

kafka-streams-TimeWindows.adoc

+42-26
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,71 @@
11
== [[TimeWindows]] TimeWindows -- Time-Bound Window Specification
22

3-
`TimeWindows` is a concrete <<kafka-streams-Windows.adoc#, window specification>> of <<kafka-streams-TimeWindow.adoc#, time windows>>.
3+
`TimeWindows` is a <<kafka-streams-Windows.adoc#, window specification>> of <<kafka-streams-internals-TimeWindow.adoc#, time windows>>.
44

55
[[creating-instance]]
66
`TimeWindows` is described by the following properties:
77

8-
* [[sizeMs]] *Duration* (aka _size_) (in millis)
8+
* [[sizeMs]] *Window duration* (aka _window size_) (in millis)
99
* [[advanceMs]] *Advance interval* (in millis)
10+
* [[grace]] *Grace period* for late events
11+
* [[maintainDurationMs]] *Maintain duration* (in millis)
12+
13+
NOTE: <<maintainDurationMs, Maintain duration>> is no longer in use.
1014

1115
`TimeWindows` can be created only using the <<of, of>> factory method.
1216

1317
[source, java]
1418
----
15-
TimeWindows of(final long sizeMs) throws IllegalArgumentException
19+
static TimeWindows of(final Duration size)
1620
----
1721

18-
`of` is used to create a time specification of *tumbling windows* which are fixed-sized, gap-less, non-overlapping windows (and simply sets the <<sizeMs, sizeMs>> and <<advanceMs, advanceMs>> internal properties to the same value).
22+
`of` is used to create a time specification of *tumbling windows* which are fixed-sized, gap-less, non-overlapping windows (and simply sets the <<sizeMs, sizeMs>> and <<advanceMs, advanceMs>> internal properties to the given value).
1923

20-
`TimeWindows` can be further configured using the <<advanceBy, advanceBy>> method.
24+
[source, scala]
25+
----
26+
import org.apache.kafka.streams.kstream.TimeWindows
27+
import java.time.Duration
28+
val timeWindows = TimeWindows.of(Duration.ofMinutes(1))
29+
scala> println(timeWindows)
30+
TimeWindows{maintainDurationMs=86400000, sizeMs=60000, advanceMs=60000, grace=null, segments=3}
31+
----
32+
33+
`TimeWindows` can be further configured using the <<advanceBy, advanceBy>> and <<grace-method, grace>> methods.
2134

2235
[source, java]
2336
----
24-
TimeWindows advanceBy(final long advanceMs)
37+
TimeWindows advanceBy(final Duration advance)
2538
----
2639

2740
`advanceBy` allows for a time specification of *hopping windows* which are fixed-sized, overlapping windows (and simply sets the <<advanceMs, advanceMs>> internal property).
2841

2942
[source, scala]
3043
----
3144
import org.apache.kafka.streams.kstream.TimeWindows
32-
import scala.concurrent.duration._
45+
import java.time.Duration
46+
val timeWindows = TimeWindows
47+
.of(Duration.ofMinutes(1))
48+
.advanceBy(Duration.ofSeconds(30))
49+
scala> println(timeWindows)
50+
TimeWindows{maintainDurationMs=86400000, sizeMs=60000, advanceMs=30000, grace=null, segments=3}
51+
----
52+
53+
[source, java]
54+
----
55+
TimeWindows grace(final Duration afterWindowEnd)
56+
----
57+
58+
`grace` specifies how long to wait for *late events* to be included in a time window (and simply sets the <<grace, grace>> internal property).
59+
60+
[source, scala]
61+
----
62+
import org.apache.kafka.streams.kstream.TimeWindows
63+
import java.time.Duration
3364
val timeWindows = TimeWindows
34-
.of(1.minute.toMillis)
35-
.advanceBy(0.5.minutes.toMillis)
65+
.of(Duration.ofMinutes(1))
66+
.grace(Duration.ofMinutes(2)) // 1 minute late after a time window has elapsed
67+
scala> println(timeWindows)
68+
TimeWindows{maintainDurationMs=86400000, sizeMs=60000, advanceMs=60000, grace=PT2M, segments=3}
3669
----
3770

3871
=== [[windowsFor]] `windowsFor` Method
@@ -54,20 +87,3 @@ scala> windows.foreach(println)
5487
(30000,Window{start=30000, end=90000})
5588
(60000,Window{start=60000, end=120000})
5689
----
57-
58-
=== [[until]] Setting Window Maintain Duration (Retention Time) -- `until` Method
59-
60-
[source, java]
61-
----
62-
TimeWindows until(final long durationMs)
63-
----
64-
65-
NOTE: `until` is part of the <<kafka-streams-Windows.adoc#until, Windows Contract>> to set the window maintain duration (retention time).
66-
67-
`until` simply makes sure that the input `durationMs` is at least the <<sizeMs, size>> of the window and executes the parent's <<kafka-streams-Windows.adoc#until, until>>.
68-
69-
`until` throws an `IllegalArgumentException` when the input `durationMs` is smaller than the <<sizeMs, size>>:
70-
71-
```
72-
Window retention time (durationMs) cannot be smaller than the window size.
73-
```

kafka-streams-UnlimitedWindows.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[UnlimitedWindows]] UnlimitedWindows
2+
3+
`UnlimitedWindows` is a <<kafka-streams-Windows.adoc#, window specification>> of <<kafka-streams-internals-UnlimitedWindow.adoc#, UnlimitedWindows>>.

kafka-streams-Window.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ Window{start=[startMs], end=[endMs]}
5353
| Window
5454
| Description
5555

56-
| link:kafka-streams-SessionWindow.adoc[SessionWindow]
56+
| link:kafka-streams-internals-SessionWindow.adoc[SessionWindow]
5757
| [[SessionWindow]]
5858

5959
| `TimeWindow`

kafka-streams-Windows.adoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ The default window maintain duration is `1 day`.
6262
| Windows
6363
| Description
6464

65-
| JoinWindows
65+
| <<kafka-streams-JoinWindows.adoc#, JoinWindows>>
6666
| [[JoinWindows]]
6767

6868
| <<kafka-streams-TimeWindows.adoc#, TimeWindows>>
File renamed without changes.
File renamed without changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
== [[UnlimitedWindow]] UnlimitedWindow
2+
3+
`UnlimitedWindow` is...FIXME

0 commit comments

Comments
 (0)