Skip to content

Commit cadd682

Browse files
committed
Increment counters when receiving sync lines
1 parent bd01bfd commit cadd682

File tree

5 files changed

+65
-48
lines changed

5 files changed

+65
-48
lines changed

packages/powersync_core/lib/powersync_core.dart

+1-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,5 @@ export 'src/exceptions.dart';
1010
export 'src/log.dart';
1111
export 'src/open_factory.dart';
1212
export 'src/schema.dart';
13-
export 'src/sync_status.dart'
14-
hide InternalSyncDownloadProgress, OperationCounter;
13+
export 'src/sync_status.dart' hide InternalSyncDownloadProgress;
1514
export 'src/uuid.dart';

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

+13
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ final class MutableSyncStatus {
5858

5959
void applyCheckpointStarted(Checkpoint target) {
6060
downloading = true;
61+
// TODO: Include pending ops from interrupted download, if any...
6162
downloadProgress = InternalSyncDownloadProgress.fromZero(target);
6263
}
6364

@@ -66,6 +67,18 @@ final class MutableSyncStatus {
6667
uploadError = error;
6768
}
6869

70+
void applyBatchReceived(
71+
Map<String, BucketDescription?> currentBuckets, SyncDataBatch batch) {
72+
downloading = true;
73+
if (downloadProgress case final previousProgress?) {
74+
downloadProgress = previousProgress.incrementDownloaded([
75+
for (final bucket in batch.buckets)
76+
if (currentBuckets[bucket.bucket] case final knownBucket?)
77+
(BucketPriority(knownBucket.priority), bucket.data.length),
78+
]);
79+
}
80+
}
81+
6982
SyncStatus immutableSnapsot() {
7083
return SyncStatus(
7184
connected: connected,

packages/powersync_core/lib/src/sync/streaming_sync.dart

+3
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,9 @@ class StreamingSyncImplementation implements StreamingSync {
391391
await adapter.removeBuckets(diff.removedBuckets);
392392
adapter.setTargetCheckpoint(targetCheckpoint);
393393
case SyncDataBatch():
394+
// TODO: This increments the counters before actually saving sync
395+
// data. Might be fine though?
396+
_state.updateStatus((s) => s.applyBatchReceived(bucketMap, line));
394397
_state.updateStatus((s) => s.downloading = true);
395398
await adapter.saveSyncData(line);
396399
case StreamingSyncKeepalive(:final tokenExpiresIn):

packages/powersync_core/lib/src/sync_status.dart

+37-28
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'dart:math';
2+
13
import 'package:collection/collection.dart';
24
import 'package:meta/meta.dart';
35

@@ -161,11 +163,11 @@ final class SyncStatus {
161163
String toString() {
162164
return "SyncStatus<connected: $connected connecting: $connecting downloading: $downloading uploading: $uploading lastSyncedAt: $lastSyncedAt, hasSynced: $hasSynced, error: $anyError>";
163165
}
164-
}
165166

166-
// This should be a ListEquality<SyncPriorityStatus>, but that appears to
167-
// cause weird type errors with DDC (but only after hot reloads?!)
168-
const _statusEquality = ListEquality<Object?>();
167+
// This should be a ListEquality<SyncPriorityStatus>, but that appears to
168+
// cause weird type errors with DDC (but only after hot reloads?!)
169+
static const _statusEquality = ListEquality<Object?>();
170+
}
169171

170172
/// The priority of a PowerSync bucket.
171173
extension type const BucketPriority._(int priorityNumber) {
@@ -214,34 +216,24 @@ class UploadQueueStats {
214216
}
215217
}
216218

217-
@internal
218-
typedef OperationCounter = ({BucketPriority priority, int opCount});
219-
220219
@internal
221220
final class InternalSyncDownloadProgress {
222-
final List<OperationCounter> downloaded;
223-
final List<OperationCounter> target;
221+
final Map<BucketPriority, int> downloaded;
222+
final Map<BucketPriority, int> target;
224223

225224
final int _totalDownloaded;
226225
final int _totalTarget;
227226

228227
InternalSyncDownloadProgress(this.downloaded, this.target)
229-
: _totalDownloaded = downloaded.map((e) => e.opCount).sum,
230-
_totalTarget = target.map((e) => e.opCount).sum;
228+
: _totalDownloaded = target.values.sum,
229+
_totalTarget = target.values.sum;
231230

232231
factory InternalSyncDownloadProgress.fromZero(Checkpoint target) {
233-
final totalOpsPerPriority =
234-
target.checksums.groupFoldBy<BucketPriority, int>(
232+
final targetOps = target.checksums.groupFoldBy<BucketPriority, int>(
235233
(cs) => BucketPriority(cs.priority),
236234
(prev, cs) => (prev ?? 0) + (cs.count ?? 0),
237235
);
238-
final downloaded = [
239-
for (final involvedPriority in totalOpsPerPriority.keys)
240-
(priority: involvedPriority, opCount: 0),
241-
];
242-
final targetOps = totalOpsPerPriority.entries
243-
.map((e) => (priority: e.key, opCount: e.value))
244-
.toList();
236+
final downloaded = targetOps.map((k, v) => MapEntry(k, 0));
245237

246238
return InternalSyncDownloadProgress(downloaded, targetOps);
247239
}
@@ -251,20 +243,35 @@ final class InternalSyncDownloadProgress {
251243
}
252244

253245
static int sumInPriority(
254-
List<OperationCounter> counters, BucketPriority priority) {
255-
return counters
256-
.where((e) => e.priority >= priority)
257-
.map((e) => e.opCount)
246+
Map<BucketPriority, int> counters, BucketPriority priority) {
247+
return counters.entries
248+
.where((e) => e.key >= priority)
249+
.map((e) => e.value)
258250
.sum;
259251
}
260252

253+
InternalSyncDownloadProgress incrementDownloaded(
254+
List<(BucketPriority, int)> opsInPriority) {
255+
var downloadedOps = {...downloaded};
256+
257+
for (final (priority, addedOps) in opsInPriority) {
258+
assert(downloaded.containsKey(priority));
259+
assert(target.containsKey(priority));
260+
261+
downloadedOps[priority] =
262+
max(downloadedOps[priority]! + addedOps, target[priority]!);
263+
}
264+
265+
return InternalSyncDownloadProgress(downloadedOps, target);
266+
}
267+
261268
SyncDownloadProgress get asSyncDownloadProgress =>
262269
SyncDownloadProgress._(this);
263270

264271
@override
265272
int get hashCode => Object.hash(
266-
_statusEquality.hash(downloaded),
267-
_statusEquality.hash(target),
273+
_mapEquality.hash(downloaded),
274+
_mapEquality.hash(target),
268275
);
269276

270277
@override
@@ -274,9 +281,11 @@ final class InternalSyncDownloadProgress {
274281
// them first helps find a difference faster.
275282
_totalDownloaded == other._totalDownloaded &&
276283
_totalTarget == other._totalTarget &&
277-
_statusEquality.equals(downloaded, other.downloaded) &&
278-
_statusEquality.equals(target, other.target);
284+
_mapEquality.equals(downloaded, other.downloaded) &&
285+
_mapEquality.equals(target, other.target);
279286
}
287+
288+
static const _mapEquality = MapEquality<Object?, Object?>();
280289
}
281290

282291
/// Provides realtime progress about how PowerSync is downloading rows.

packages/powersync_core/lib/src/web/sync_worker_protocol.dart

+11-18
Original file line numberDiff line numberDiff line change
@@ -153,22 +153,8 @@ extension type SerializedOperationCounter._(JSObject _) implements JSObject {
153153
required int opCount,
154154
});
155155

156-
factory SerializedOperationCounter.fromDart(OperationCounter progress) {
157-
return SerializedOperationCounter(
158-
priority: progress.priority.priorityNumber,
159-
opCount: progress.opCount,
160-
);
161-
}
162-
163156
external JSNumber get priority;
164157
external JSNumber get opCount;
165-
166-
OperationCounter get toDart {
167-
return (
168-
priority: BucketPriority(priority.toDartInt),
169-
opCount: opCount.toDartInt
170-
);
171-
}
172158
}
173159

174160
@anonymous
@@ -197,15 +183,22 @@ extension type SerializedDownloadProgress._(JSObject _) implements JSObject {
197183
}
198184

199185
static JSArray<SerializedOperationCounter> _serializeCounters(
200-
List<OperationCounter> counters) {
186+
Map<BucketPriority, int> counters) {
201187
return [
202-
for (final entry in counters) SerializedOperationCounter.fromDart(entry)
188+
for (final MapEntry(:key, :value) in counters.entries)
189+
SerializedOperationCounter(
190+
priority: key.priorityNumber,
191+
opCount: value,
192+
)
203193
].toJS;
204194
}
205195

206-
static List<OperationCounter> _deserializeCounters(
196+
static Map<BucketPriority, int> _deserializeCounters(
207197
JSArray<SerializedOperationCounter> counters) {
208-
return [for (final entry in counters.toDart) entry.toDart];
198+
return {
199+
for (final entry in counters.toDart)
200+
BucketPriority(entry.priority.toDartInt): entry.opCount.toDartInt,
201+
};
209202
}
210203
}
211204

0 commit comments

Comments
 (0)