diff --git a/.changeset/polite-goats-eat.md b/.changeset/polite-goats-eat.md new file mode 100644 index 00000000..9c3b85ca --- /dev/null +++ b/.changeset/polite-goats-eat.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-errors': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-image': minor +--- + +Added support for MongoDB resume tokens. This should help detect Change Stream error edge cases such as changing the replication connection details after replication has begun. diff --git a/modules/module-mongodb/src/common/MongoLSN.ts b/modules/module-mongodb/src/common/MongoLSN.ts new file mode 100644 index 00000000..a91fc3f1 --- /dev/null +++ b/modules/module-mongodb/src/common/MongoLSN.ts @@ -0,0 +1,74 @@ +import { mongo } from '@powersync/lib-service-mongodb'; +import { storage } from '@powersync/service-core'; + +export type MongoLSNSpecification = { + timestamp: mongo.Timestamp; + /** + * The ResumeToken type here is an alias for `unknown`. + * The docs mention the contents should be of the form: + * ```typescript + * { + * "_data" : + * } + * ``` + * We use BSON serialization to store the resume token. + */ + resume_token?: mongo.ResumeToken; +}; + +export const ZERO_LSN = '0000000000000000'; + +const DELIMINATOR = '|'; + +/** + * Represent a Logical Sequence Number (LSN) for MongoDB replication sources. + * This stores a combination of the cluster timestamp and optional Change Stream resume token. + */ +export class MongoLSN { + static fromSerialized(comparable: string): MongoLSN { + return new MongoLSN(MongoLSN.deserialize(comparable)); + } + + private static deserialize(comparable: string): MongoLSNSpecification { + const [timestampString, resumeString] = comparable.split(DELIMINATOR); + + const a = parseInt(timestampString.substring(0, 8), 16); + const b = parseInt(timestampString.substring(8, 16), 16); + + return { + timestamp: mongo.Timestamp.fromBits(b, a), + resume_token: resumeString ? storage.deserializeBson(Buffer.from(resumeString, 'base64')).resumeToken : null + }; + } + + static ZERO = MongoLSN.fromSerialized(ZERO_LSN); + + constructor(protected options: MongoLSNSpecification) {} + + get timestamp() { + return this.options.timestamp; + } + + get resumeToken() { + return this.options.resume_token; + } + + get comparable() { + const { timestamp, resumeToken } = this; + + const a = timestamp.high.toString(16).padStart(8, '0'); + const b = timestamp.low.toString(16).padStart(8, '0'); + + const segments = [`${a}${b}`]; + + if (resumeToken) { + segments.push(storage.serializeBson({ resumeToken }).toString('base64')); + } + + return segments.join(DELIMINATOR); + } + + toString() { + return this.comparable; + } +} diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 555348a7..c0e9e56e 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -1,6 +1,7 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { container, + DatabaseConnectionError, ErrorCode, logger, ReplicationAbortedError, @@ -9,20 +10,13 @@ import { } from '@powersync/lib-services-framework'; import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core'; import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; +import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; import { escapeRegExp } from '../utils.js'; import { MongoManager } from './MongoManager.js'; -import { - constructAfterRecord, - createCheckpoint, - getMongoLsn, - getMongoRelation, - mongoLsnToTimestamp -} from './MongoRelation.js'; +import { constructAfterRecord, createCheckpoint, getMongoRelation } from './MongoRelation.js'; import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; -export const ZERO_LSN = '0000000000000000'; - export interface ChangeStreamOptions { connections: MongoManager; storage: storage.SyncRulesBucketStorage; @@ -41,9 +35,9 @@ interface InitResult { * * Some change stream documents do not have postImages. * * startAfter/resumeToken is not valid anymore. */ -export class ChangeStreamInvalidatedError extends Error { - constructor(message: string) { - super(message); +export class ChangeStreamInvalidatedError extends DatabaseConnectionError { + constructor(message: string, cause: any) { + super(ErrorCode.PSYNC_S1344, message, cause); } } @@ -207,7 +201,7 @@ export class ChangeStream { const session = await this.client.startSession(); try { await this.storage.startBatch( - { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, + { zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, async (batch) => { // Start by resolving all tables. // This checks postImage configuration, and that should fail as @@ -220,12 +214,12 @@ export class ChangeStream { for (let table of allSourceTables) { await this.snapshotTable(batch, table, session); - await batch.markSnapshotDone([table], ZERO_LSN); + await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable); await touch(); } - const lsn = getMongoLsn(snapshotTime); + const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime }); logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); await batch.commit(lsn); } @@ -516,7 +510,7 @@ export class ChangeStream { e.codeName == 'NoMatchingDocument' && e.errmsg?.includes('post-image was not found') ) { - throw new ChangeStreamInvalidatedError(e.errmsg); + throw new ChangeStreamInvalidatedError(e.errmsg, e); } throw e; } @@ -527,10 +521,13 @@ export class ChangeStream { await this.storage.autoActivate(); await this.storage.startBatch( - { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, + { zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false }, async (batch) => { - const lastLsn = batch.lastCheckpointLsn; - const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined; + const { lastCheckpointLsn } = batch; + const lastLsn = lastCheckpointLsn ? MongoLSN.fromSerialized(lastCheckpointLsn) : null; + const startAfter = lastLsn?.timestamp; + const resumeAfter = lastLsn?.resumeToken; + logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); const filters = this.getSourceNamespaceFilters(); @@ -554,12 +551,21 @@ export class ChangeStream { } const streamOptions: mongo.ChangeStreamOptions = { - startAtOperationTime: startAfter, showExpandedEvents: true, useBigInt64: true, maxAwaitTimeMS: 200, fullDocument: fullDocument }; + + /** + * Only one of these options can be supplied at a time. + */ + if (resumeAfter) { + streamOptions.resumeAfter = resumeAfter; + } else { + streamOptions.startAtOperationTime = startAfter; + } + let stream: mongo.ChangeStream; if (filters.multipleDatabases) { // Requires readAnyDatabase@admin on Atlas @@ -579,7 +585,7 @@ export class ChangeStream { }); // Always start with a checkpoint. - // This helps us to clear erorrs when restarting, even if there is + // This helps us to clear errors when restarting, even if there is // no data to replicate. let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb); @@ -592,6 +598,11 @@ export class ChangeStream { const originalChangeDocument = await stream.tryNext(); + // The stream was closed, we will only ever receive `null` from it + if (!originalChangeDocument && stream.closed) { + break; + } + if (originalChangeDocument == null || this.abort_signal.aborted) { continue; } @@ -626,15 +637,38 @@ export class ChangeStream { throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`); } - // console.log('event', changeDocument); - if ( (changeDocument.operationType == 'insert' || changeDocument.operationType == 'update' || - changeDocument.operationType == 'replace') && + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'drop') && changeDocument.ns.coll == CHECKPOINTS_COLLECTION ) { - const lsn = getMongoLsn(changeDocument.clusterTime!); + /** + * Dropping the database does not provide an `invalidate` event. + * We typically would receive `drop` events for the collection which we + * would process below. + * + * However we don't commit the LSN after collections are dropped. + * The prevents the `startAfter` or `resumeToken` from advancing past the drop events. + * The stream also closes after the drop events. + * This causes an infinite loop of processing the collection drop events. + * + * This check here invalidates the change stream if our `_checkpoints` collection + * is dropped. This allows for detecting when the DB is dropped. + */ + if (changeDocument.operationType == 'drop') { + throw new ChangeStreamInvalidatedError( + 'Internal collections have been dropped', + new Error('_checkpoints collection was dropped') + ); + } + + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { waitForCheckpointLsn = null; } diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts index 3245f0f2..e2ba6a24 100644 --- a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -1,4 +1,4 @@ -import { mongo } from '@powersync/lib-service-mongodb'; +import { isMongoServerError } from '@powersync/lib-service-mongodb'; import { container } from '@powersync/lib-services-framework'; import { replication } from '@powersync/service-core'; @@ -85,8 +85,8 @@ export class ChangeStreamReplicationJob extends replication.AbstractReplicationJ } if (e instanceof ChangeStreamInvalidatedError) { throw e; - } else if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { - throw new ChangeStreamInvalidatedError(e.message); + } else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new ChangeStreamInvalidatedError(e.message, e); } else { // Report the error if relevant, before retrying container.reporter.captureException(e, { diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index ee248951..cc4774a2 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -3,8 +3,9 @@ import { storage } from '@powersync/service-core'; import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules'; -import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; +import { MongoLSN } from '../common/MongoLSN.js'; +import { CHECKPOINTS_COLLECTION } from './replication-utils.js'; export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor { return { @@ -15,21 +16,6 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S } satisfies storage.SourceEntityDescriptor; } -export function getMongoLsn(timestamp: mongo.Timestamp) { - const a = timestamp.high.toString(16).padStart(8, '0'); - const b = timestamp.low.toString(16).padStart(8, '0'); - return a + b; -} - -export function mongoLsnToTimestamp(lsn: string | null) { - if (lsn == null) { - return null; - } - const a = parseInt(lsn.substring(0, 8), 16); - const b = parseInt(lsn.substring(8, 16), 16); - return mongo.Timestamp.fromBits(b, a); -} - export function constructAfterRecord(document: mongo.Document): SqliteRow { let record: SqliteRow = {}; for (let key of Object.keys(document)) { @@ -174,7 +160,7 @@ export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): ); const time = session.operationTime!; // TODO: Use the above when we support custom write checkpoints - return getMongoLsn(time); + return new MongoLSN({ timestamp: time }).comparable; } finally { await session.endSession(); } diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index bf7abbcd..90763fd0 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -85,7 +85,7 @@ export class ChangeStreamTestContext { } startStreaming() { - this.streamPromise = this.walStream.streamChanges(); + return (this.streamPromise = this.walStream.streamChanges()); } async getCheckpoint(options?: { timeout?: number }) { diff --git a/modules/module-mongodb/test/src/resume.test.ts b/modules/module-mongodb/test/src/resume.test.ts new file mode 100644 index 00000000..f5e3944a --- /dev/null +++ b/modules/module-mongodb/test/src/resume.test.ts @@ -0,0 +1,152 @@ +import { MongoLSN, ZERO_LSN } from '@module/common/MongoLSN.js'; + +import { MongoManager } from '@module/replication/MongoManager.js'; +import { normalizeConnectionConfig } from '@module/types/types.js'; +import { isMongoServerError, mongo } from '@powersync/lib-service-mongodb'; +import { BucketStorageFactory, TestStorageOptions } from '@powersync/service-core'; +import { describe, expect, test, vi } from 'vitest'; +import { ChangeStreamTestContext } from './change_stream_utils.js'; +import { env } from './env.js'; +import { INITIALIZED_MONGO_STORAGE_FACTORY, INITIALIZED_POSTGRES_STORAGE_FACTORY } from './util.js'; + +describe('mongo lsn', () => { + test('LSN with resume tokens should be comparable', () => { + // Values without a resume token should be comparable + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(1) + }).comparable < + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10) + }).comparable + ).true; + + // Values with resume tokens should correctly compare + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(1), + resume_token: { _data: 'resume1' } + }).comparable < + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10), + resume_token: { _data: 'resume2' } + }).comparable + ).true; + + // The resume token should not affect comparison + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(1), + resume_token: { _data: '2' } + }).comparable < + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10), + resume_token: { _data: '1' } + }).comparable + ).true; + + // Resume token should not be required for comparison + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10), + resume_token: { _data: '2' } + }).comparable > // Switching the order to test this case + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(9) + }).comparable + ).true; + + // Comparison should be backwards compatible with old LSNs + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10), + resume_token: { _data: '2' } + }).comparable > ZERO_LSN + ).true; + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10), + resume_token: { _data: '2' } + }).comparable > + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(1) + }).comparable.split('|')[0] // Simulate an old LSN + ).true; + expect( + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(1), + resume_token: { _data: '2' } + }).comparable < + new MongoLSN({ + timestamp: mongo.Timestamp.fromNumber(10) + }).comparable.split('|')[0] // Simulate an old LSN + ).true; + }); +}); + +describe.skipIf(!env.TEST_MONGO_STORAGE)('MongoDB resume - mongo storage', () => { + defineResumeTest(INITIALIZED_MONGO_STORAGE_FACTORY); +}); + +describe.skipIf(!env.TEST_POSTGRES_STORAGE)('MongoDB resume - postgres storage', () => { + defineResumeTest(INITIALIZED_POSTGRES_STORAGE_FACTORY); +}); + +function defineResumeTest(factoryGenerator: (options?: TestStorageOptions) => Promise) { + test('resuming with a different source database', async () => { + await using context = await ChangeStreamTestContext.open(factoryGenerator); + const { db } = context; + + await context.updateSyncRules(/* yaml */ + ` bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const collection = db.collection('test_data'); + await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + + // Wait for the item above to be replicated. The commit should store a resume token. + await vi.waitFor( + async () => { + const checkpoint = await context.storage?.getCheckpoint(); + expect(MongoLSN.fromSerialized(checkpoint!.lsn!).resumeToken).exist; + }, + { timeout: 5000 } + ); + + // Done with this context for now + await context.dispose(); + + // Use the provided MongoDB url to connect to a different source database + const originalUrl = env.MONGO_TEST_URL; + // Change this to a different database + const url = new URL(originalUrl); + const parts = url.pathname.split('/'); + parts[1] = 'differentDB'; // Replace the database name + url.pathname = parts.join('/'); + + // Point to a new source DB + const connectionManager = new MongoManager( + normalizeConnectionConfig({ + type: 'mongodb', + uri: url.toString() + }) + ); + const factory = await factoryGenerator({ doNotClear: true }); + + // Create a new context without updating the sync rules + await using context2 = new ChangeStreamTestContext(factory, connectionManager); + const activeContent = await factory.getActiveSyncRulesContent(); + context2.storage = factory.getInstance(activeContent!); + + const error = await context2.startStreaming().catch((ex) => ex); + expect(error).exist; + // The ChangeStreamReplicationJob will detect this and throw a ChangeStreamInvalidatedError + expect(isMongoServerError(error) && error.hasErrorLabel('NonResumableChangeStreamError')); + }); +} diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index 9458be92..5e607f3d 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -240,6 +240,19 @@ export enum ErrorCode { */ PSYNC_S1343 = 'PSYNC_S1343', + /** + * The MongoDB Change Stream has been invalidated. + * + * Possible causes: + * - Some change stream documents do not have postImages. + * - startAfter/resumeToken is not valid anymore. + * - The replication connection has changed. + * - The database has been dropped. + * + * Replication will be stopped for this Change Stream. Replication will restart with a new Change Stream. + */ + PSYNC_S1344 = 'PSYNC_S1344', + // ## PSYNC_S14xx: MongoDB storage replication issues /**