Skip to content

Commit 996beeb

Browse files
committed
Add tests for stream throttling
1 parent 2938232 commit 996beeb

File tree

3 files changed

+148
-12
lines changed

3 files changed

+148
-12
lines changed

packages/sqlite_async/lib/src/update_notification.dart

+13-12
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,8 @@ class UpdateNotification {
6262
static StreamTransformer<UpdateNotification, UpdateNotification>
6363
filterTablesTransformer(Iterable<String> tables) {
6464
Set<String> normalized = {for (var table in tables) table.toLowerCase()};
65-
return StreamTransformer<UpdateNotification,
66-
UpdateNotification>.fromHandlers(handleData: (data, sink) {
67-
if (data.containsAny(normalized)) {
68-
sink.add(data);
69-
}
70-
});
65+
return StreamTransformer.fromBind(
66+
(source) => source.where((data) => data.containsAny(normalized)));
7167
}
7268
}
7369

@@ -77,21 +73,22 @@ class UpdateNotification {
7773
/// Behaviour:
7874
/// If there was no event in "timeout", and one comes in, it is pushed immediately.
7975
/// Otherwise, we wait until the timeout is over.
80-
Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
76+
Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
8177
{bool throttleFirst = false, T Function(T, T)? add, T? addOne}) async* {
8278
var nextPing = Completer<void>();
79+
var done = false;
8380
T? lastData;
8481

8582
var listener = input.listen((data) {
86-
if (lastData is T && add != null) {
87-
lastData = add(lastData as T, data);
83+
if (lastData != null && add != null) {
84+
lastData = add(lastData!, data);
8885
} else {
8986
lastData = data;
9087
}
9188
if (!nextPing.isCompleted) {
9289
nextPing.complete();
9390
}
94-
});
91+
}, onDone: () => done = true);
9592

9693
try {
9794
if (addOne != null) {
@@ -100,7 +97,7 @@ Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
10097
if (throttleFirst) {
10198
await Future.delayed(timeout);
10299
}
103-
while (true) {
100+
while (!done) {
104101
// If a value is available now, we'll use it immediately.
105102
// If not, this waits for it.
106103
await nextPing.future;
@@ -114,6 +111,10 @@ Stream<T> _throttleStream<T>(Stream<T> input, Duration timeout,
114111
await Future.delayed(timeout);
115112
}
116113
} finally {
117-
listener.cancel();
114+
if (lastData case final data?) {
115+
yield data;
116+
}
117+
118+
await listener.cancel();
118119
}
119120
}

packages/sqlite_async/pubspec.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dev_dependencies:
3434
stream_channel: ^2.1.2
3535
path: ^1.9.0
3636
test_descriptor: ^2.0.2
37+
fake_async: ^1.3.3
3738

3839
platforms:
3940
android:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import 'dart:async';
2+
3+
import 'package:fake_async/fake_async.dart';
4+
import 'package:sqlite_async/src/update_notification.dart';
5+
import 'package:test/test.dart';
6+
7+
void main() {
8+
group('Update notifications', () {
9+
const timeout = Duration(seconds: 10);
10+
const halfTimeout = Duration(seconds: 5);
11+
12+
group('throttle', () {
13+
test('can add initial', () {
14+
fakeAsync((control) {
15+
final source = StreamController<UpdateNotification>(sync: true);
16+
final events = <UpdateNotification>[];
17+
18+
UpdateNotification.throttleStream(source.stream, timeout,
19+
addOne: UpdateNotification({'a'})).listen(events.add);
20+
21+
control.flushMicrotasks();
22+
expect(events, hasLength(1));
23+
control.elapse(halfTimeout);
24+
25+
source.add(UpdateNotification({'b'}));
26+
expect(events, hasLength(1)); // Still a delay from the initial one
27+
28+
control.elapse(halfTimeout);
29+
expect(events, hasLength(2));
30+
});
31+
});
32+
33+
test('sends events after initial throttle', () {
34+
fakeAsync((control) {
35+
final source = StreamController<UpdateNotification>(sync: true);
36+
final events = <UpdateNotification>[];
37+
38+
UpdateNotification.throttleStream(source.stream, timeout)
39+
.listen(events.add);
40+
41+
source.add(UpdateNotification({'a'}));
42+
control.elapse(halfTimeout);
43+
expect(events, isEmpty);
44+
45+
control.elapse(halfTimeout);
46+
expect(events, hasLength(1));
47+
});
48+
});
49+
50+
test('merges events', () {
51+
fakeAsync((control) {
52+
final source = StreamController<UpdateNotification>(sync: true);
53+
final events = <UpdateNotification>[];
54+
55+
UpdateNotification.throttleStream(source.stream, timeout)
56+
.listen(events.add);
57+
58+
source.add(UpdateNotification({'a'}));
59+
control.elapse(halfTimeout);
60+
expect(events, isEmpty);
61+
62+
source.add(UpdateNotification({'b'}));
63+
control.elapse(halfTimeout);
64+
expect(events, [
65+
UpdateNotification({'a', 'b'})
66+
]);
67+
});
68+
});
69+
70+
test('forwards cancellations', () {
71+
fakeAsync((control) {
72+
var cancelled = false;
73+
final source = StreamController<UpdateNotification>(sync: true)
74+
..onCancel = () => cancelled = true;
75+
76+
final sub = UpdateNotification.throttleStream(source.stream, timeout)
77+
.listen((_) => fail('unexpected event'),
78+
onDone: () => fail('unexpected done'));
79+
80+
source.add(UpdateNotification({'a'}));
81+
control.elapse(halfTimeout);
82+
83+
sub.cancel();
84+
control.flushTimers();
85+
86+
expect(cancelled, isTrue);
87+
expect(control.pendingTimers, isEmpty);
88+
});
89+
});
90+
91+
test('closes when source closes', () {
92+
fakeAsync((control) {
93+
final source = StreamController<UpdateNotification>(sync: true)
94+
..onCancel = () => Future.value();
95+
final events = <UpdateNotification>[];
96+
var done = false;
97+
98+
UpdateNotification.throttleStream(source.stream, timeout)
99+
.listen(events.add, onDone: () => done = true);
100+
101+
source
102+
// These two are combined due to throttleFirst
103+
..add(UpdateNotification({'a'}))
104+
..add(UpdateNotification({'b'}))
105+
..close();
106+
107+
control.flushTimers();
108+
expect(events, [
109+
UpdateNotification({'a', 'b'})
110+
]);
111+
expect(done, isTrue);
112+
expect(control.pendingTimers, isEmpty);
113+
});
114+
});
115+
});
116+
117+
test('filter tables', () async {
118+
final source = StreamController<UpdateNotification>(sync: true);
119+
final events = <UpdateNotification>[];
120+
final subscription = UpdateNotification.filterTablesTransformer(['a'])
121+
.bind(source.stream)
122+
.listen(events.add);
123+
124+
source.add(UpdateNotification({'a', 'b'}));
125+
expect(events, hasLength(1));
126+
127+
source.add(UpdateNotification({'b'}));
128+
expect(events, hasLength(1));
129+
130+
await subscription.cancel();
131+
expect(source.hasListener, isFalse);
132+
});
133+
});
134+
}

0 commit comments

Comments
 (0)