Skip to content

Commit

Permalink
feat(NODE-6329): client bulk write happy path
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Sep 18, 2024
1 parent 643a875 commit 43fa4b6
Show file tree
Hide file tree
Showing 28 changed files with 2,690 additions and 46 deletions.
16 changes: 14 additions & 2 deletions src/cmap/command_monitoring_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
LEGACY_HELLO_COMMAND_CAMEL_CASE
} from '../constants';
import { calculateDurationInMs, deepCopy } from '../utils';
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
import {
DocumentSequence,
OpMsgRequest,
type OpQueryRequest,
type WriteProtocolMessageType
} from './commands';
import type { Connection } from './connection';

/**
Expand Down Expand Up @@ -249,7 +254,14 @@ const OP_QUERY_KEYS = [
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
function extractCommand(command: WriteProtocolMessageType): Document {
if (command instanceof OpMsgRequest) {
return deepCopy(command.command);
const cmd = deepCopy(command.command);
if (cmd.ops instanceof DocumentSequence) {
cmd.ops = cmd.ops.documents;
}
if (cmd.nsInfo instanceof DocumentSequence) {
cmd.nsInfo = cmd.nsInfo.documents;
}
return cmd;
}

if (command.query?.$query) {
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,10 @@ export class OpMsgRequest {
for (const [key, value] of Object.entries(document)) {
if (value instanceof DocumentSequence) {
// Document sequences starts with type 1 at the first byte.
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
buffer[0] = 1;
// Third part is the field name at offset 5.
encodeUTF8Into(buffer, key, 5);
// Third part is the field name at offset 5 with trailing null byte.
encodeUTF8Into(buffer, `${key}\0`, 5);
chunks.push(buffer);
// Fourth part are the documents' bytes.
let docsLength = 0;
Expand All @@ -557,7 +557,7 @@ export class OpMsgRequest {
chunks.push(docBson);
}
// Second part of the sequence is the length at offset 1;
buffer.writeInt32LE(key.length + docsLength, 1);
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
// Why are we removing the field from the command? This is because it needs to be
// removed in the OP_MSG request first section, and DocumentSequence is not a
// BSON type and is specific to the MongoDB wire protocol so there's nothing
Expand Down
26 changes: 26 additions & 0 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
return this.toObject(options);
}
}

/**
* Client bulk writes have some extra metadata at the top level that needs to be
* included in the result returned to the user.
*/
export class ClientBulkWriteCursorResponse extends CursorResponse {
get insertedCount() {
return this.get('nInserted', BSONType.int, true);
}

get upsertedCount() {
return this.get('nUpserted', BSONType.int, true);
}

get matchedCount() {
return this.get('nMatched', BSONType.int, true);
}

get modifiedCount() {
return this.get('nModified', BSONType.int, true);
}

get deletedCount() {
return this.get('nDeleted', BSONType.int, true);
}
}
64 changes: 64 additions & 0 deletions src/cursor/client_bulk_write_cursor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import type { Document } from '../bson';
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
import type { MongoClient } from '../mongo_client';
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { mergeOptions, MongoDBNamespace } from '../utils';
import {
AbstractCursor,
type AbstractCursorOptions,
type InitialCursorResponse
} from './abstract_cursor';

/** @public */
export interface ClientBulkWriteCursorOptions
extends AbstractCursorOptions,
ClientBulkWriteOptions {}

/**
* @public
*/
export class ClientBulkWriteCursor extends AbstractCursor {
public readonly command: Document;
/** @internal */
private cursorResponse?: ClientBulkWriteCursorResponse;
/** @internal */
private clientBulkWriteOptions: ClientBulkWriteOptions;

/** @internal */
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
super(client, new MongoDBNamespace('admin'), options);

this.command = command;
this.clientBulkWriteOptions = options;
}

get response(): ClientBulkWriteCursorResponse {
if (this.cursorResponse) return this.cursorResponse;
throw new Error('no cursor response');
}

clone(): ClientBulkWriteCursor {
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
delete clonedOptions.session;
return new ClientBulkWriteCursor(this.client, this.command, {
...clonedOptions
});
}

/** @internal */
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
...this.clientBulkWriteOptions,
...this.cursorOptions,
session
});

const response = await executeOperation(this.client, clientBulkWriteOperation);
this.cursorResponse = response;

return { server: clientBulkWriteOperation.server, session, response };
}
}
15 changes: 15 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,21 @@ export type {
AggregateOptions,
DB_AGGREGATE_COLLECTION
} from './operations/aggregate';
export type {
AnyClientBulkWriteModel,
ClientBulkWriteOptions,
ClientBulkWriteResult,
ClientDeleteManyModel,
ClientDeleteOneModel,
ClientDeleteResult,
ClientInsertOneModel,
ClientInsertOneResult,
ClientReplaceOneModel,
ClientUpdateManyModel,
ClientUpdateOneModel,
ClientUpdateResult,
ClientWriteModel
} from './operations/client_bulk_write/common';
export type {
CollationOptions,
CommandOperation,
Expand Down
18 changes: 18 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import {
SeverityLevel
} from './mongo_logger';
import { TypedEventEmitter } from './mongo_types';
import {
type AnyClientBulkWriteModel,
type ClientBulkWriteOptions,
type ClientBulkWriteResult
} from './operations/client_bulk_write/common';
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
import { executeOperation } from './operations/execute_operation';
import { RunAdminCommandOperation } from './operations/run_command';
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
Expand Down Expand Up @@ -477,6 +483,18 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
return this.s.bsonOptions;
}

/**
* Executes a client bulk write operation, available on server 8.0+.
* @param models - The client bulk write models.
* @param options - The client bulk write options.
*/
async bulkWrite(
models: AnyClientBulkWriteModel[],
options?: ClientBulkWriteOptions
): Promise<ClientBulkWriteResult> {
return await new ClientBulkWriteExecutor(this, models, options).execute();
}

/**
* Connect to MongoDB using a url
*
Expand Down
43 changes: 43 additions & 0 deletions src/operations/client_bulk_write/client_bulk_write.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { type Document } from 'bson';

import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { MongoDBNamespace } from '../../utils';
import { AbstractOperation, Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteOptions } from './common';

export class ClientBulkWriteOperation extends AbstractOperation<ClientBulkWriteCursorResponse> {
command: Document;
override options: ClientBulkWriteOptions;

override get commandName() {
return 'bulkWrite' as const;
}

constructor(command: Document, options: ClientBulkWriteOptions) {
super(options);
this.command = command;
this.options = options;
this.ns = new MongoDBNamespace('admin');
}

override async execute(
server: Server,
session: ClientSession | undefined
): Promise<ClientBulkWriteCursorResponse> {
return await server.command(
this.ns,
this.command,
{
...this.options,
...this.bsonOptions,
documentsReturnedIn: 'firstBatch',
session
},
ClientBulkWriteCursorResponse
);
}
}

defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION]);
8 changes: 7 additions & 1 deletion src/operations/client_bulk_write/command_builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type Document } from '../../bson';
import { type Document, ObjectId } from '../../bson';
import { DocumentSequence } from '../../cmap/commands';
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
import { type CollationOptions } from '../command';
Expand All @@ -23,6 +23,7 @@ export interface ClientBulkWriteCommand {
nsInfo: DocumentSequence;
bypassDocumentValidation?: boolean;
let?: Document;
comment?: any;
}

/** @internal */
Expand Down Expand Up @@ -88,6 +89,10 @@ export class ClientBulkWriteCommandBuilder {
if (this.options.let) {
command.let = this.options.let;
}

if (this.options.comment != null) {
command.comment = this.options.comment;
}
return [command];
}
}
Expand All @@ -112,6 +117,7 @@ export const buildInsertOneOperation = (
insert: index,
document: model.document
};
document.document._id = model.document._id ?? new ObjectId();
return document;
};

Expand Down
74 changes: 74 additions & 0 deletions src/operations/client_bulk_write/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,77 @@ export type AnyClientBulkWriteModel =
| ClientUpdateManyModel
| ClientDeleteOneModel
| ClientDeleteManyModel;

/** @public */
export interface ClientBulkWriteResult {
/**
* The total number of documents inserted across all insert operations.
*/
insertedCount: number;
/**
* The total number of documents upserted across all update operations.
*/
upsertedCount: number;
/**
* The total number of documents matched across all update operations.
*/
matchedCount: number;
/**
* The total number of documents modified across all update operations.
*/
modifiedCount: number;
/**
* The total number of documents deleted across all delete operations.
*/
deletedCount: number;
/**
* The results of each individual insert operation that was successfully performed.
*/
insertResults?: Map<number, ClientInsertOneResult>;
/**
* The results of each individual update operation that was successfully performed.
*/
updateResults?: Map<number, ClientUpdateResult>;
/**
* The results of each individual delete operation that was successfully performed.
*/
deleteResults?: Map<number, ClientDeleteResult>;
}

/** @public */
export interface ClientInsertOneResult {
/**
* The _id of the inserted document.
*/
insertedId: any;
}

/** @public */
export interface ClientUpdateResult {
/**
* The number of documents that matched the filter.
*/
matchedCount: number;

/**
* The number of documents that were modified.
*/
modifiedCount: number;

/**
* The _id field of the upserted document if an upsert occurred.
*
* It MUST be possible to discern between a BSON Null upserted ID value and this field being
* unset. If necessary, drivers MAY add a didUpsert boolean field to differentiate between
* these two cases.
*/
upsertedId?: any;
}

/** @public */
export interface ClientDeleteResult {
/**
* The number of documents that were deleted.
*/
deletedCount: number;
}
Loading

0 comments on commit 43fa4b6

Please sign in to comment.