-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathstream_join.py
79 lines (64 loc) · 1.67 KB
/
stream_join.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""stream_aggregate example.
docker run --rm -p 9092:9092 emgeee/kafka_emit_measurements:latest
"""
import json
import pprint as pp
import signal
import sys
from denormalized import Context
from denormalized.datafusion import col
from denormalized.datafusion import functions as f
def signal_handler(_sig, _frame):
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
bootstrap_server = "localhost:9092"
timestamp_column = "occurred_at_ms"
sample_event = {
"occurred_at_ms": 100,
"sensor_name": "foo",
"reading": 0.0,
}
def print_batch(rb):
pp.pprint(rb.to_pydict())
ctx = Context()
temperature_ds = ctx.from_topic(
"temperature", json.dumps(sample_event), bootstrap_server, timestamp_column
)
humidity_ds = (
ctx.from_topic(
"humidity",
json.dumps(sample_event),
bootstrap_server,
timestamp_column,
)
.with_column("humidity_sensor", col("sensor_name"))
.drop_columns(["sensor_name"])
.window(
[col("humidity_sensor")],
[
f.count(col("reading")).alias("avg_humidity"),
],
4000,
None,
)
.with_column("humidity_window_start_time", col("window_start_time"))
.with_column("humidity_window_end_time", col("window_end_time"))
.drop_columns(["window_start_time", "window_end_time"])
)
joined_ds = (
temperature_ds.window(
[col("sensor_name")],
[
f.avg(col("reading")).alias("avg_temperature"),
],
4000,
None,
)
.join(
humidity_ds,
"inner",
["sensor_name", "window_start_time"],
["humidity_sensor", "humidity_window_start_time"],
)
.sink(print_batch)
)