Skip to content

Commit 016e341

Browse files
authored
Merge pull request #87 from powersync-ja/fix/throttle-improvements
Add tests for update stream throttling
2 parents 2938232 + 3a0b54a commit 016e341

File tree

3 files changed

+175
-11
lines changed

3 files changed

+175
-11
lines changed

packages/sqlite_async/lib/src/update_notification.dart

+20-11
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,8 @@ class UpdateNotification {
6262
static StreamTransformer<UpdateNotification, UpdateNotification>
6363
filterTablesTransformer(Iterable<String> tables) {
6464
Set<String> normalized = {for (var table in tables) table.toLowerCase()};
65-
return StreamTransformer<UpdateNotification,
66-
UpdateNotification>.fromHandlers(handleData: (data, sink) {
67-
if (data.containsAny(normalized)) {
68-
sink.add(data);
69-
}
70-
});
65+
return StreamTransformer.fromBind(
66+
(source) => source.where((data) => data.containsAny(normalized)));
7167
}
7268
}
7369

@@ -77,20 +73,27 @@ class UpdateNotification {
7773
/// Behaviour:
7874
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
7975
/// Otherwise, we wait until the timeout is over.
80-
Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
76+
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
8177
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
8278
var nextPing = Completer<void>();
79+
var done = false;
8380
T? lastData;
8481

8582
var listener = input.listen((data) {
86-
if (lastData is T && add != null) {
87-
lastData = add(lastData as T, data);
83+
if (lastData != null && add != null) {
84+
lastData = add(lastData!, data);
8885
} else {
8986
lastData = data;
9087
}
9188
if (!nextPing.isCompleted) {
9289
nextPing.complete();
9390
}
91+
}, onDone: () {
92+
if (!nextPing.isCompleted) {
93+
nextPing.complete();
94+
}
95+
96+
done = true;
9497
});
9598

9699
try {
@@ -100,10 +103,12 @@ Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
100103
if (throttleFirst) {
101104
await Future.delayed(timeout);
102105
}
103-
while (true) {
106+
while (!done) {
104107
// If a value is available now, we'll use it immediately.
105108
// If not, this waits for it.
106109
await nextPing.future;
110+
if (done) break;
111+
107112
// Capture any new values coming in while we wait.
108113
nextPing = Completer<void>();
109114
T data = lastData as T;
@@ -114,6 +119,10 @@ Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
114119
await Future.delayed(timeout);
115120
}
116121
} finally {
117-
listener.cancel();
122+
if (lastData case final data?) {
123+
yield data;
124+
}
125+
126+
await listener.cancel();
118127
}
119128
}

packages/sqlite_async/pubspec.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dev_dependencies:
3434
stream_channel: ^2.1.2
3535
path: ^1.9.0
3636
test_descriptor: ^2.0.2
37+
fake_async: ^1.3.3
3738

3839
platforms:
3940
android:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import 'dart:async';
2+
3+
import 'package:fake_async/fake_async.dart';
4+
import 'package:sqlite_async/src/update_notification.dart';
5+
import 'package:test/test.dart';
6+
7+
void main() {
8+
group('Update notifications', () {
9+
const timeout = Duration(seconds: 10);
10+
const halfTimeout = Duration(seconds: 5);
11+
12+
group('throttle', () {
13+
test('can add initial', () {
14+
fakeAsync((control) {
15+
final source = StreamController<UpdateNotification>(sync: true);
16+
final events = <UpdateNotification>[];
17+
18+
UpdateNotification.throttleStream(source.stream, timeout,
19+
addOne: UpdateNotification({'a'})).listen(events.add);
20+
21+
control.flushMicrotasks();
22+
expect(events, hasLength(1));
23+
control.elapse(halfTimeout);
24+
25+
source.add(UpdateNotification({'b'}));
26+
expect(events, hasLength(1)); // Still a delay from the initial one
27+
28+
control.elapse(halfTimeout);
29+
expect(events, hasLength(2));
30+
});
31+
});
32+
33+
test('sends events after initial throttle', () {
34+
fakeAsync((control) {
35+
final source = StreamController<UpdateNotification>(sync: true);
36+
final events = <UpdateNotification>[];
37+
38+
UpdateNotification.throttleStream(source.stream, timeout)
39+
.listen(events.add);
40+
41+
source.add(UpdateNotification({'a'}));
42+
control.elapse(halfTimeout);
43+
expect(events, isEmpty);
44+
45+
control.elapse(halfTimeout);
46+
expect(events, hasLength(1));
47+
});
48+
});
49+
50+
test('merges events', () {
51+
fakeAsync((control) {
52+
final source = StreamController<UpdateNotification>(sync: true);
53+
final events = <UpdateNotification>[];
54+
55+
UpdateNotification.throttleStream(source.stream, timeout)
56+
.listen(events.add);
57+
58+
source.add(UpdateNotification({'a'}));
59+
control.elapse(halfTimeout);
60+
expect(events, isEmpty);
61+
62+
source.add(UpdateNotification({'b'}));
63+
control.elapse(halfTimeout);
64+
expect(events, [
65+
UpdateNotification({'a', 'b'})
66+
]);
67+
});
68+
});
69+
70+
test('forwards cancellations', () {
71+
fakeAsync((control) {
72+
var cancelled = false;
73+
final source = StreamController<UpdateNotification>(sync: true)
74+
..onCancel = () => cancelled = true;
75+
76+
final sub = UpdateNotification.throttleStream(source.stream, timeout)
77+
.listen((_) => fail('unexpected event'),
78+
onDone: () => fail('unexpected done'));
79+
80+
source.add(UpdateNotification({'a'}));
81+
control.elapse(halfTimeout);
82+
83+
sub.cancel();
84+
control.flushTimers();
85+
86+
expect(cancelled, isTrue);
87+
expect(control.pendingTimers, isEmpty);
88+
});
89+
});
90+
91+
test('closes when source closes', () {
92+
fakeAsync((control) {
93+
final source = StreamController<UpdateNotification>(sync: true)
94+
..onCancel = () => Future.value();
95+
final events = <UpdateNotification>[];
96+
var done = false;
97+
98+
UpdateNotification.throttleStream(source.stream, timeout)
99+
.listen(events.add, onDone: () => done = true);
100+
101+
source
102+
// These two are combined due to throttleFirst
103+
..add(UpdateNotification({'a'}))
104+
..add(UpdateNotification({'b'}))
105+
..close();
106+
107+
control.flushTimers();
108+
expect(events, [
109+
UpdateNotification({'a', 'b'})
110+
]);
111+
expect(done, isTrue);
112+
expect(control.pendingTimers, isEmpty);
113+
});
114+
});
115+
116+
test('closes when source closes after delay', () {
117+
fakeAsync((control) {
118+
final source = StreamController<UpdateNotification>(sync: true)
119+
..onCancel = () => Future.value();
120+
final events = <UpdateNotification>[];
121+
var done = false;
122+
123+
UpdateNotification.throttleStream(source.stream, timeout)
124+
.listen(events.add, onDone: () => done = true);
125+
126+
control.elapse(const Duration(hours: 1));
127+
source.close();
128+
129+
control.flushTimers();
130+
expect(events, isEmpty);
131+
expect(done, isTrue);
132+
expect(control.pendingTimers, isEmpty);
133+
});
134+
});
135+
});
136+
137+
test('filter tables', () async {
138+
final source = StreamController<UpdateNotification>(sync: true);
139+
final events = <UpdateNotification>[];
140+
final subscription = UpdateNotification.filterTablesTransformer(['a'])
141+
.bind(source.stream)
142+
.listen(events.add);
143+
144+
source.add(UpdateNotification({'a', 'b'}));
145+
expect(events, hasLength(1));
146+
147+
source.add(UpdateNotification({'b'}));
148+
expect(events, hasLength(1));
149+
150+
await subscription.cancel();
151+
expect(source.hasListener, isFalse);
152+
});
153+
});
154+
}

0 commit comments

Comments
 (0)