Skip to content

Commit 3a0b54a

Browse files
committed
Fix closing after delay
1 parent 996beeb commit 3a0b54a

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

packages/sqlite_async/lib/src/update_notification.dart

+9-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,13 @@ Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
8888
if (!nextPing.isCompleted) {
8989
nextPing.complete();
9090
}
91-
}, onDone: () => done = true);
91+
}, onDone: () {
92+
if (!nextPing.isCompleted) {
93+
nextPing.complete();
94+
}
95+
96+
done = true;
97+
});
9298

9399
try {
94100
if (addOne != null) {
@@ -101,6 +107,8 @@ Stream<T> _throttleStream<T extends Object>(Stream<T> input, Duration timeout,
101107
// If a value is available now, we'll use it immediately.
102108
// If not, this waits for it.
103109
await nextPing.future;
110+
if (done) break;
111+
104112
// Capture any new values coming in while we wait.
105113
nextPing = Completer<void>();
106114
T data = lastData as T;

packages/sqlite_async/test/update_notification_test.dart

+20
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,26 @@ void main() {
112112
expect(control.pendingTimers, isEmpty);
113113
});
114114
});
115+
116+
test('closes when source closes after delay', () {
117+
fakeAsync((control) {
118+
final source = StreamController<UpdateNotification>(sync: true)
119+
..onCancel = () => Future.value();
120+
final events = <UpdateNotification>[];
121+
var done = false;
122+
123+
UpdateNotification.throttleStream(source.stream, timeout)
124+
.listen(events.add, onDone: () => done = true);
125+
126+
control.elapse(const Duration(hours: 1));
127+
source.close();
128+
129+
control.flushTimers();
130+
expect(events, isEmpty);
131+
expect(done, isTrue);
132+
expect(control.pendingTimers, isEmpty);
133+
});
134+
});
115135
});
116136

117137
test('filter tables', () async {

0 commit comments

Comments
 (0)