Skip to content

Commit

Permalink
feat(NODE-6337): implement client bulk write batching
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 19, 2024
1 parent ddcae44 commit bb1a2bb
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 152 deletions.
6 changes: 5 additions & 1 deletion src/operations/client_bulk_write/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ async function executeAcknowledged(
): Promise<ClientBulkWriteResult> {
const resultsMerger = new ClientBulkWriteResultsMerger(options);
// For each command will will create and exhaust a cursor for the results.
let currentBatchOffset = 0;
for (const command of commands) {
const cursor = new ClientBulkWriteCursor(client, command, options);
const docs = await cursor.toArray();
resultsMerger.merge(command.ops.documents, cursor.response, docs);
const operations = command.ops.documents;
resultsMerger.merge(currentBatchOffset, operations, cursor.response, docs);
// Set the new batch index so we can back back to the index in the original models.
currentBatchOffset += operations.length;
}
return resultsMerger.result;
}
Expand Down
12 changes: 9 additions & 3 deletions src/operations/client_bulk_write/results_merger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,13 @@ export class ClientBulkWriteResultsMerger {

/**
* Merge the results in the cursor to the existing result.
* @param currentBatchOffset - The offset index to the original models.
* @param response - The cursor response.
* @param documents - The documents in the cursor.
* @returns The current result.
*/
merge(
currentBatchOffset: number,
operations: Document[],
response: ClientBulkWriteCursorResponse,
documents: Document[]
Expand All @@ -69,7 +71,9 @@ export class ClientBulkWriteResultsMerger {
const operation = operations[document.idx];
// Handle insert results.
if ('insert' in operation) {
this.result.insertResults?.set(document.idx, { insertedId: operation.document._id });
this.result.insertResults?.set(document.idx + currentBatchOffset, {
insertedId: operation.document._id
});
}
// Handle update results.
if ('update' in operation) {
Expand All @@ -80,11 +84,13 @@ export class ClientBulkWriteResultsMerger {
if (document.upserted) {
result.upsertedId = document.upserted._id;
}
this.result.updateResults?.set(document.idx, result);
this.result.updateResults?.set(document.idx + currentBatchOffset, result);
}
// Handle delete results.
if ('delete' in operation) {
this.result.deleteResults?.set(document.idx, { deletedCount: document.n });
this.result.deleteResults?.set(document.idx + currentBatchOffset, {
deletedCount: document.n
});
}
}
}
Expand Down
Loading

0 comments on commit bb1a2bb

Please sign in to comment.