-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
source-mongodb: batch mode for capture bindings #1956
Conversation
As part of the batch-mode implementation, the connector needs to know if the server supports change streams at all. If it doesn't, this will be a fast path toward making the capture bindings all default to batch-mode, among other things.
…Length The latest MongoDB driver provides a RemainingBatchLength method for change streams, and this changes the change stream processing code to make use of that for simplifications and some slight optimizations. `tryStream` has been refactored in `pullStream`, which will request one batch of documents from the change stream cursor, emit them all, and then return. Higher-level termination conditions can be handled by the caller. An optimization implemented here is to not checkpoint unless a document is actually being emitted, or to catch the post-batch resume token. This was previously not possible because `TryNext` could return true forever if there was a high rate of changes on the server, and if they were exclusively for non-tracked collections we would never have emitted a checkpoint in that case. An additional optimization is to quickly check if the received event is for a tracked collection and discard it if it isn't. This may help with cases where a very busy database is being watched, but only a few of its collections are captured.
…ordinator Further refactoring and streamlining of the change stream code path: There will be just one `streamChanges` function, and it can be configured to return or not once streams are caught up. Returning in this way will be done for intermittent streaming while backfilling incremental change stream collections, and the change streams will run forever after those collections are done backfilling. This is partially enabled by the new batch/stream coordinator apparatus, which will be utilized in later commits to support concurrent change streams & batch backfills of non-incremental change stream collections.
a228f01
to
bb3474c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % minor question.
// work. It's just easier to not allow it for now unless the | ||
// binding is being reset (backfill counter incremented) as | ||
// well. | ||
if lastResource.getMode() != res.getMode() && binding.Backfill == lastBackfillCounters[idx] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the backfill counter comparisons here be binding.Backfill <= lastBackfillCounters[idx]
? Or is the control plane making sure the backfill counter is never decremented & we don't need to check that in the connector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neither the control plane nor the connector checks that backfill counters are never decremented in general. Materializations do have some code for checking this which I added as a curiosity. It is perhaps debatable if this condition should be enforced. As it is, if you decrement the backfill counter for a task for some reason, the outcome is undefined - it's probably not a good idea, but has actually come in handy sometimes. So I think when designing connectors they should assume that backfill counters always increase, even if that isn't strictly true, and if one happens to decrease we really can't predict what is going to happen and shouldn't try.
Also looks like the check for Edit: Actually, it's because the config shared between Clarification to my edit: |
Adds configuration options for capturing MongoDB collections in "batch" mode, and support for initial discovering those kinds of collections. Proper support for discovering time series collections (which will have a different id field) will come later, along with the implementation for incremental batch bindings that use a cursor.
…ollections Some initial restructuring of the backfill code path to make it more amenable to also handling "batch" collections which will re-backfill over and over again forever on a schedule.
…as incremental change stream bindings Enables capturing documents from batch snapshot bindings concurrently with on-going incremental change streaming The connector will prioritize getting one-shot backfills done for collections using change streams for on-going change data capture. Once those backfills are done, both batch bindings and change streams will run indefinitely. During this time the batchStreamCoordinator will signal the batch bindings to pause and allow the change streams to fully catch up occasionally.
Adds the changes needed to actually support capturing incremental batch bindings with or without a custom cursor value. The only kind of collection that will be discovered as an incremental batch binding is a time series collection, and the cursor will automatically be determined as the timestamp field for the collection.
The main change here is to make updateResourceStates know that it can reset a database change stream token if all of the change stream bindings (only) have been reset. Batch bindings do not 'count' for needing to retain the database change stream cursor.
As a bit of a convenience, prevent the user from changing the cursor field by returning an error during the Validate call if the cursor field is attempted to be changed. If the cursor field must be changed, re-backfilling the binding will allow that to happen. Without a backfill, changing the cursor field would have unpredictable and almost certainly not desired effects.
Adds a rigorous test for change stream correctness that simulates capturing from several streams on a database under some load.
Fixes a handful of bugs I found when doing some final Q/A tests, including: * Don't panic when getting server info if there are 0 configured bindings * Discovery must discover bindings in batch mode if the server doesn't support change streams * Correctly get server info in Validate, and compare empty resource mode with change stream requirements using the implicit default Also re-ordered the resource configuration fields for cursor & poll schedule to make a little more intuitive sense hopefully.
Ah yep I updated the materialization spec as well with some wording tweaks to match the capture and forgot to update the test snapshots there. |
daa2a62
to
cb0822e
Compare
Description:
Adds a new "batch" capture mode for bindings. It can be one of "Change Stream Incremental", "Batch Snapshot", or "Batch Incremental".
"Change Stream Incremental" works like the current connector, and captures collections with an initial backfill and on-going CDC from change streams. This is the default mode that is used, and all previously discovered bindings use this mode implicitly.
"Batch Snapshot" works like you'd expect a "batch" binding to work, refreshing the whole collection on a given frequency, which defaults to every 24 hours.
"Batch Incremental" uses a user-supplied cursor field to capture new documents (or perhaps updated documents) on a frequency, where the cursor field value is larger than a previously observed cursor. This can be used for collections that have a strictly-increasing value for a field.
A server that does support change streams by default will not discover non-change-streamable bindings (views and time series collections). There is a configuration option that can be selected to enable discovery of these "batch" bindings on such a server. A server that does not support change streams at all will discover all bindings in "batch" mode. This seems like a reasonable way to do things since "Change Stream Incremental" is certainly preferred where possible and probably what people want most of the time. It also has the nice property of not having all the pre-existing captures possibly discover a bunch of new bindings unexpectedly if there are views or time series collections that we didn't previously discover.
For testing, I've added a number of tests to cover the new behavior as well as enhancing the existing tests. I've also ran a number of manual scenarios, most notably making sure that a capture that previously completed its backfill and is in "streaming" mode on the previous connector image continues to work normally after switching to this new connector image, AKA the migration scenario.
Closes #1914
Workflow steps:
(How does one use this feature, and how has it changed)
Documentation links affected:
The
source-mongodb
docs should be updated along with merging this pull request.Notes for reviewers:
(anything that might help someone review this PR)
This change is