Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] MongoDB Resume Tokens #196

Merged
merged 10 commits into from
Feb 10, 2025
7 changes: 7 additions & 0 deletions .changeset/polite-goats-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-errors': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-image': minor
---

Added support for MongoDB resume tokens. This should help detect Change Stream error edge cases such as changing the replication connection details after replication has begun.
74 changes: 74 additions & 0 deletions modules/module-mongodb/src/common/MongoLSN.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { storage } from '@powersync/service-core';

export type MongoLSNSpecification = {
timestamp: mongo.Timestamp;
/**
* The ResumeToken type here is an alias for `unknown`.
* The docs mention the contents should be of the form:
* ```typescript
* {
* "_data" : <BinData|string>
* }
* ```
* We use BSON serialization to store the resume token.
*/
resume_token?: mongo.ResumeToken;
};

export const ZERO_LSN = '0000000000000000';

const DELIMINATOR = '|';

/**
* Represent a Logical Sequence Number (LSN) for MongoDB replication sources.
* This stores a combination of the cluster timestamp and optional Change Stream resume token.
*/
export class MongoLSN {
static fromSerialized(comparable: string): MongoLSN {
return new MongoLSN(MongoLSN.deserialize(comparable));
}

private static deserialize(comparable: string): MongoLSNSpecification {
const [timestampString, resumeString] = comparable.split(DELIMINATOR);

const a = parseInt(timestampString.substring(0, 8), 16);
const b = parseInt(timestampString.substring(8, 16), 16);

return {
timestamp: mongo.Timestamp.fromBits(b, a),
resume_token: resumeString ? storage.deserializeBson(Buffer.from(resumeString, 'base64')).resumeToken : null
};
}

static ZERO = MongoLSN.fromSerialized(ZERO_LSN);

constructor(protected options: MongoLSNSpecification) {}

get timestamp() {
return this.options.timestamp;
}

get resumeToken() {
return this.options.resume_token;
}

get comparable() {
const { timestamp, resumeToken } = this;

const a = timestamp.high.toString(16).padStart(8, '0');
const b = timestamp.low.toString(16).padStart(8, '0');

const segments = [`${a}${b}`];

if (resumeToken) {
segments.push(storage.serializeBson({ resumeToken }).toString('base64'));
}

return segments.join(DELIMINATOR);
}

toString() {
return this.comparable;
}
}
84 changes: 59 additions & 25 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { mongo } from '@powersync/lib-service-mongodb';
import {
container,
DatabaseConnectionError,
ErrorCode,
logger,
ReplicationAbortedError,
Expand All @@ -9,20 +10,13 @@ import {
} from '@powersync/lib-services-framework';
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import {
constructAfterRecord,
createCheckpoint,
getMongoLsn,
getMongoRelation,
mongoLsnToTimestamp
} from './MongoRelation.js';
import { constructAfterRecord, createCheckpoint, getMongoRelation } from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export const ZERO_LSN = '0000000000000000';

export interface ChangeStreamOptions {
connections: MongoManager;
storage: storage.SyncRulesBucketStorage;
Expand All @@ -41,9 +35,9 @@ interface InitResult {
* * Some change stream documents do not have postImages.
* * startAfter/resumeToken is not valid anymore.
*/
export class ChangeStreamInvalidatedError extends Error {
constructor(message: string) {
super(message);
export class ChangeStreamInvalidatedError extends DatabaseConnectionError {
constructor(message: string, cause: any) {
super(ErrorCode.PSYNC_S1344, message, cause);
}
}

Expand Down Expand Up @@ -207,7 +201,7 @@ export class ChangeStream {
const session = await this.client.startSession();
try {
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
Expand All @@ -220,12 +214,12 @@ export class ChangeStream {

for (let table of allSourceTables) {
await this.snapshotTable(batch, table, session);
await batch.markSnapshotDone([table], ZERO_LSN);
await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable);

await touch();
}

const lsn = getMongoLsn(snapshotTime);
const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
}
Expand Down Expand Up @@ -516,7 +510,7 @@ export class ChangeStream {
e.codeName == 'NoMatchingDocument' &&
e.errmsg?.includes('post-image was not found')
) {
throw new ChangeStreamInvalidatedError(e.errmsg);
throw new ChangeStreamInvalidatedError(e.errmsg, e);
}
throw e;
}
Expand All @@ -527,10 +521,13 @@ export class ChangeStream {
await this.storage.autoActivate();

await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
{ zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
const lastLsn = batch.lastCheckpointLsn;
const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined;
const { lastCheckpointLsn } = batch;
const lastLsn = lastCheckpointLsn ? MongoLSN.fromSerialized(lastCheckpointLsn) : null;
const startAfter = lastLsn?.timestamp;
const resumeAfter = lastLsn?.resumeToken;

logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);

const filters = this.getSourceNamespaceFilters();
Expand All @@ -554,12 +551,21 @@ export class ChangeStream {
}

const streamOptions: mongo.ChangeStreamOptions = {
startAtOperationTime: startAfter,
showExpandedEvents: true,
useBigInt64: true,
maxAwaitTimeMS: 200,
fullDocument: fullDocument
};

/**
* Only one of these options can be supplied at a time.
*/
if (resumeAfter) {
streamOptions.resumeAfter = resumeAfter;
} else {
streamOptions.startAtOperationTime = startAfter;
}

let stream: mongo.ChangeStream<mongo.Document>;
if (filters.multipleDatabases) {
// Requires readAnyDatabase@admin on Atlas
Expand All @@ -579,7 +585,7 @@ export class ChangeStream {
});

// Always start with a checkpoint.
// This helps us to clear erorrs when restarting, even if there is
// This helps us to clear errors when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);

Expand All @@ -592,6 +598,11 @@ export class ChangeStream {

const originalChangeDocument = await stream.tryNext();

// The stream was closed, we will only ever receive `null` from it
if (!originalChangeDocument && stream.closed) {
break;
}

if (originalChangeDocument == null || this.abort_signal.aborted) {
continue;
}
Expand Down Expand Up @@ -626,15 +637,38 @@ export class ChangeStream {
throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`);
}

// console.log('event', changeDocument);

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace') &&
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'drop') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
const lsn = getMongoLsn(changeDocument.clusterTime!);
/**
* Dropping the database does not provide an `invalidate` event.
* We typically would receive `drop` events for the collection which we
* would process below.
*
* However we don't commit the LSN after collections are dropped.
* The prevents the `startAfter` or `resumeToken` from advancing past the drop events.
* The stream also closes after the drop events.
* This causes an infinite loop of processing the collection drop events.
*
* This check here invalidates the change stream if our `_checkpoints` collection
* is dropped. This allows for detecting when the DB is dropped.
*/
if (changeDocument.operationType == 'drop') {
throw new ChangeStreamInvalidatedError(
'Internal collections have been dropped',
new Error('_checkpoints collection was dropped')
);
}

const { comparable: lsn } = new MongoLSN({
timestamp: changeDocument.clusterTime!,
resume_token: changeDocument._id
});

if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
waitForCheckpointLsn = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { isMongoServerError } from '@powersync/lib-service-mongodb';
import { container } from '@powersync/lib-services-framework';
import { replication } from '@powersync/service-core';

Expand Down Expand Up @@ -85,8 +85,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ
}
if (e instanceof ChangeStreamInvalidatedError) {
throw e;
} else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message);
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
throw new ChangeStreamInvalidatedError(e.message, e);
} else {
// Report the error if relevant, before retrying
container.reporter.captureException(e, {
Expand Down
20 changes: 3 additions & 17 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { storage } from '@powersync/service-core';
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules';

import { CHECKPOINTS_COLLECTION } from './replication-utils.js';
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
import { MongoLSN } from '../common/MongoLSN.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor {
return {
Expand All @@ -15,21 +16,6 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
} satisfies storage.SourceEntityDescriptor;
}

export function getMongoLsn(timestamp: mongo.Timestamp) {
const a = timestamp.high.toString(16).padStart(8, '0');
const b = timestamp.low.toString(16).padStart(8, '0');
return a + b;
}

export function mongoLsnToTimestamp(lsn: string | null) {
if (lsn == null) {
return null;
}
const a = parseInt(lsn.substring(0, 8), 16);
const b = parseInt(lsn.substring(8, 16), 16);
return mongo.Timestamp.fromBits(b, a);
}

export function constructAfterRecord(document: mongo.Document): SqliteRow {
let record: SqliteRow = {};
for (let key of Object.keys(document)) {
Expand Down Expand Up @@ -174,7 +160,7 @@ export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db):
);
const time = session.operationTime!;
// TODO: Use the above when we support custom write checkpoints
return getMongoLsn(time);
return new MongoLSN({ timestamp: time }).comparable;
} finally {
await session.endSession();
}
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ export class ChangeStreamTestContext {
}

startStreaming() {
this.streamPromise = this.walStream.streamChanges();
return (this.streamPromise = this.walStream.streamChanges());
}

async getCheckpoint(options?: { timeout?: number }) {
Expand Down
Loading