Skip to content

Commit 11ef604

Browse files
committed
Fix race condition for checkpoints during uploads
1 parent 773cbba commit 11ef604

File tree

3 files changed

+186
-58
lines changed

3 files changed

+186
-58
lines changed

packages/powersync_core/lib/src/streaming_sync.dart

+88-56
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class StreamingSyncImplementation implements StreamingSync {
6868
bool _safeToClose = true;
6969

7070
final Mutex syncMutex, crudMutex;
71+
Completer<void>? _activeCrudUpload;
7172

7273
final Map<String, String> _userAgentHeaders;
7374

@@ -177,7 +178,7 @@ class StreamingSyncImplementation implements StreamingSync {
177178
}
178179

179180
Future<void> crudLoop() async {
180-
await uploadAllCrud();
181+
await _uploadAllCrud();
181182

182183
// Trigger a CRUD upload whenever the upstream trigger fires
183184
// as-well-as whenever the sync stream reconnects.
@@ -187,11 +188,13 @@ class StreamingSyncImplementation implements StreamingSync {
187188
// The stream here is closed on abort.
188189
await for (var _ in mergeStreams(
189190
[crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
190-
await uploadAllCrud();
191+
await _uploadAllCrud();
191192
}
192193
}
193194

194-
Future<void> uploadAllCrud() async {
195+
Future<void> _uploadAllCrud() {
196+
assert(_activeCrudUpload == null);
197+
final completer = _activeCrudUpload = Completer();
195198
return crudMutex.lock(() async {
196199
// Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
197200
CrudEntry? checkedCrudItem;
@@ -244,7 +247,11 @@ class StreamingSyncImplementation implements StreamingSync {
244247
_updateStatus(uploading: false);
245248
}
246249
}
247-
}, timeout: retryDelay);
250+
}, timeout: retryDelay).whenComplete(() {
251+
assert(identical(_activeCrudUpload, completer));
252+
_activeCrudUpload = null;
253+
completer.complete();
254+
});
248255
}
249256

250257
Future<String> getWriteCheckpoint() async {
@@ -336,7 +343,7 @@ class StreamingSyncImplementation implements StreamingSync {
336343
return (initialRequests, localDescriptions);
337344
}
338345

339-
Future<bool> streamingSyncIteration(
346+
Future<void> streamingSyncIteration(
340347
{AbortController? abortController}) async {
341348
adapter.startSession();
342349

@@ -379,51 +386,27 @@ class StreamingSyncImplementation implements StreamingSync {
379386
await adapter.removeBuckets([...bucketsToDelete]);
380387
_updateStatus(downloading: true);
381388
case StreamingSyncCheckpointComplete():
382-
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
383-
if (!result.checkpointValid) {
384-
// This means checksums failed. Start again with a new checkpoint.
385-
// TODO: better back-off
386-
// await new Promise((resolve) => setTimeout(resolve, 50));
387-
return false;
388-
} else if (!result.ready) {
389-
// Checksums valid, but need more data for a consistent checkpoint.
390-
// Continue waiting.
391-
} else {
392-
appliedCheckpoint = targetCheckpoint;
393-
394-
final now = DateTime.now();
395-
_updateStatus(
396-
downloading: false,
397-
downloadError: _noError,
398-
lastSyncedAt: now,
399-
priorityStatusEntries: [
400-
if (appliedCheckpoint.checksums.isNotEmpty)
401-
(
402-
hasSynced: true,
403-
lastSyncedAt: now,
404-
priority: maxBy(
405-
appliedCheckpoint.checksums
406-
.map((cs) => BucketPriority(cs.priority)),
407-
(priority) => priority,
408-
compare: BucketPriority.comparator,
409-
)!,
410-
)
411-
],
412-
);
389+
final result =
390+
await _applyCheckpoint(targetCheckpoint!, abortController);
391+
if (result.abort) {
392+
return;
413393
}
414-
415394
validatedCheckpoint = targetCheckpoint;
395+
if (result.didApply) {
396+
appliedCheckpoint = targetCheckpoint;
397+
}
416398
case StreamingSyncCheckpointPartiallyComplete(:final bucketPriority):
417399
final result = await adapter.syncLocalDatabase(targetCheckpoint!,
418400
forPriority: bucketPriority);
419401
if (!result.checkpointValid) {
420402
// This means checksums failed. Start again with a new checkpoint.
421403
// TODO: better back-off
422404
// await new Promise((resolve) => setTimeout(resolve, 50));
423-
return false;
405+
return;
424406
} else if (!result.ready) {
425-
// Checksums valid, but need more data for a consistent checkpoint.
426-
// Continue waiting.
407+
// If we have pending uploads, we can't complete new checkpoints
408+
// outside of priority 0. We'll resolve this for a complete
409+
// checkpoint later.
427410
} else {
428411
_updateStatusForPriority((
429412
priority: BucketPriority(bucketPriority),
@@ -494,22 +477,13 @@ class StreamingSyncImplementation implements StreamingSync {
494477
downloadError: _noError,
495478
lastSyncedAt: DateTime.now());
496479
} else if (validatedCheckpoint == targetCheckpoint) {
497-
final result = await adapter.syncLocalDatabase(targetCheckpoint!);
498-
if (!result.checkpointValid) {
499-
// This means checksums failed. Start again with a new checkpoint.
500-
// TODO: better back-off
501-
// await new Promise((resolve) => setTimeout(resolve, 50));
502-
return false;
503-
} else if (!result.ready) {
504-
// Checksums valid, but need more data for a consistent checkpoint.
505-
// Continue waiting.
506-
} else {
480+
final result =
481+
await _applyCheckpoint(targetCheckpoint!, abortController);
482+
if (result.abort) {
483+
return;
484+
}
485+
if (result.didApply) {
507486
appliedCheckpoint = targetCheckpoint;
508-
509-
_updateStatus(
510-
downloading: false,
511-
downloadError: _noError,
512-
lastSyncedAt: DateTime.now());
513487
}
514488
}
515489
}
@@ -519,7 +493,65 @@ class StreamingSyncImplementation implements StreamingSync {
519493
break;
520494
}
521495
}
522-
return true;
496+
}
497+
498+
Future<({bool abort, bool didApply})> _applyCheckpoint(
499+
Checkpoint targetCheckpoint, AbortController? abortController) async {
500+
var result = await adapter.syncLocalDatabase(targetCheckpoint);
501+
final pendingUpload = _activeCrudUpload;
502+
503+
if (!result.checkpointValid) {
504+
// This means checksums failed. Start again with a new checkpoint.
505+
// TODO: better back-off
506+
// await new Promise((resolve) => setTimeout(resolve, 50));
507+
return const (abort: true, didApply: false);
508+
} else if (!result.ready && pendingUpload != null) {
509+
// We have pending entries in the local upload queue or are waiting to
510+
// confirm a write checkpoint, which prevented this checkpoint from
511+
// applying. Wait for that to complete and try again.
512+
isolateLogger.fine('Could not apply checkpoint due to local data. '
513+
'Waiting for in-progress upload before retrying...');
514+
await Future.any([
515+
pendingUpload.future,
516+
if (abortController case final controller?) controller.onAbort,
517+
]);
518+
519+
if (abortController?.aborted == true) {
520+
return const (abort: true, didApply: false);
521+
}
522+
523+
// Try again now that uploads have completed.
524+
result = await adapter.syncLocalDatabase(targetCheckpoint);
525+
}
526+
527+
if (result.checkpointValid && result.ready) {
528+
isolateLogger.fine('validated checkpoint: $targetCheckpoint');
529+
final now = DateTime.now();
530+
_updateStatus(
531+
downloading: false,
532+
downloadError: _noError,
533+
lastSyncedAt: now,
534+
priorityStatusEntries: [
535+
if (targetCheckpoint.checksums.isNotEmpty)
536+
(
537+
hasSynced: true,
538+
lastSyncedAt: now,
539+
priority: maxBy(
540+
targetCheckpoint.checksums
541+
.map((cs) => BucketPriority(cs.priority)),
542+
(priority) => priority,
543+
compare: BucketPriority.comparator,
544+
)!,
545+
)
546+
],
547+
);
548+
549+
return const (abort: false, didApply: true);
550+
} else {
551+
isolateLogger.fine(
552+
'Could not apply checkpoint. Waiting for next sync complete line');
553+
return const (abort: false, didApply: false);
554+
}
523555
}
524556

525557
Stream<StreamingSyncLine> streamingSyncRequest(

packages/powersync_core/test/in_memory_sync_test.dart

+92-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:async';
2+
13
import 'package:async/async.dart';
24
import 'package:logging/logging.dart';
35
import 'package:powersync_core/powersync_core.dart';
@@ -22,6 +24,7 @@ void main() {
2224
late MockSyncService syncService;
2325
late StreamingSync syncClient;
2426
var credentialsCallbackCount = 0;
27+
Future<void> Function(PowerSyncDatabase) uploadData = (db) async {};
2528

2629
setUp(() async {
2730
credentialsCallbackCount = 0;
@@ -42,7 +45,7 @@ void main() {
4245
token: 'token$credentialsCallbackCount',
4346
expiresAt: DateTime.now(),
4447
);
45-
}),
48+
}, uploadData: (db) => uploadData(db)),
4649
);
4750
});
4851

@@ -329,6 +332,94 @@ void main() {
329332
expect(nextRequest.headers['Authorization'], 'Token token2');
330333
expect(credentialsCallbackCount, 2);
331334
});
335+
336+
test('handles checkpoints during the upload process', () async {
337+
final status = await waitForConnection();
338+
339+
Future<void> expectCustomerRows(dynamic matcher) async {
340+
final rows = await database.getAll('SELECT * FROM customers');
341+
expect(rows, matcher);
342+
}
343+
344+
final uploadStarted = Completer<void>();
345+
final uploadFinished = Completer<void>();
346+
347+
uploadData = (db) async {
348+
if (await db.getCrudBatch() case final batch?) {
349+
uploadStarted.complete();
350+
await uploadFinished.future;
351+
batch.complete();
352+
}
353+
};
354+
355+
// Trigger an upload
356+
await database.execute(
357+
'INSERT INTO customers (id, name, email) VALUES (uuid(), ?, ?)',
358+
['local', '[email protected]']);
359+
await expectCustomerRows(hasLength(1));
360+
await uploadStarted.future;
361+
362+
// Pretend that the connector takes forever in uploadData, but the data
363+
// gets uploaded before the method returns.
364+
syncService.addLine({
365+
'checkpoint': Checkpoint(
366+
writeCheckpoint: '1',
367+
lastOpId: '2',
368+
checksums: [BucketChecksum(bucket: 'a', priority: 3, checksum: 0)],
369+
)
370+
});
371+
await expectLater(status, emitsThrough(isSyncStatus(downloading: true)));
372+
373+
syncService
374+
..addLine({
375+
'data': {
376+
'bucket': 'a',
377+
'data': [
378+
{
379+
'checksum': 0,
380+
'data': {'name': 'from local', 'email': '[email protected]'},
381+
'op': 'PUT',
382+
'op_id': '1',
383+
'object_id': '1',
384+
'object_type': 'customers'
385+
},
386+
{
387+
'checksum': 0,
388+
'data': {'name': 'additional', 'email': ''},
389+
'op': 'PUT',
390+
'op_id': '2',
391+
'object_id': '2',
392+
'object_type': 'customers'
393+
}
394+
]
395+
}
396+
})
397+
..addLine({
398+
'checkpoint_complete': {'last_op_id': '2'}
399+
});
400+
401+
// Despite receiving a valid checkpoint with two rows, it should not be
402+
// visible because we have local data pending.
403+
await expectCustomerRows(hasLength(1));
404+
405+
// Mark the upload as completed, this should trigger a write_checkpoint
406+
// request.
407+
final sentCheckpoint = Completer<void>();
408+
syncService.writeCheckpoint = () {
409+
sentCheckpoint.complete();
410+
return {
411+
'data': {'write_checkpoint': '1'}
412+
};
413+
};
414+
uploadFinished.complete();
415+
await sentCheckpoint.future;
416+
417+
// This should apply the checkpoint.
418+
await expectLater(status, emitsThrough(isSyncStatus(downloading: false)));
419+
420+
// Meaning that the two rows are now visible.
421+
await expectCustomerRows(hasLength(2));
422+
});
332423
});
333424
}
334425

packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart

+6-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ final class MockSyncService {
1010
Completer<Request> _listener = Completer();
1111

1212
final router = Router();
13+
Object? Function() writeCheckpoint = () {
14+
return {
15+
'data': {'write_checkpoint': '10'}
16+
};
17+
};
1318

1419
MockSyncService() {
1520
router
@@ -27,7 +32,7 @@ final class MockSyncService {
2732
});
2833
})
2934
..get('/write-checkpoint2.json', (request) {
30-
return Response.ok('{"data": {"write_checkpoint": "10"}}', headers: {
35+
return Response.ok(json.encode(writeCheckpoint()), headers: {
3136
'Content-Type': 'application/json',
3237
});
3338
});

0 commit comments

Comments
 (0)