Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(NODE-6722): migrate parallel benchmarks #4404

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/benchmarks/driver_bench/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"version": "0.0.0",
"private": true,
"scripts": {
"prestart": "tsc",
"prestart": "rm -rf lib && tsc",
"start": "node lib/main.mjs"
}
}
15 changes: 13 additions & 2 deletions test/benchmarks/driver_bench/src/driver.mts
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,18 @@ export function snakeToCamel(name: string) {
import type mongodb from '../../../../mongodb.js';
export type { mongodb };

const { MongoClient, GridFSBucket } = require(path.join(MONGODB_DRIVER_PATH));
const { MongoClient, GridFSBucket, BSON } = require(path.join(MONGODB_DRIVER_PATH));

export { BSON };
export const EJSON = BSON.EJSON;

const DB_NAME = 'perftest';
const COLLECTION_NAME = 'corpus';

const SPEC_DIRECTORY = path.resolve(__dirname, '..', '..', 'driverBench', 'spec');
// TODO(NODE-6729): move spec into this folder: export const SPEC_DIRECTORY = path.resolve(__dirname, '..', 'spec');
export const SPEC_DIRECTORY = path.resolve(__dirname, '..', '..', 'driverBench', 'spec');
export const PARALLEL_DIRECTORY = path.resolve(SPEC_DIRECTORY, 'parallel');
export const TEMP_DIRECTORY = path.resolve(SPEC_DIRECTORY, 'tmp');

export type Metric = {
name: 'megabytes_per_second';
Expand Down Expand Up @@ -186,6 +192,11 @@ export class DriverTester {
throw new Error('unknown type: ' + type);
}

async resetTmpDir() {
await fs.rm(TEMP_DIRECTORY, { recursive: true, force: true });
await fs.mkdir(TEMP_DIRECTORY);
}

async insertManyOf(document: Record<string, any>, length: number, addId = false) {
const utilClient = new MongoClient(MONGODB_URI, MONGODB_CLIENT_OPTIONS);
const db = utilClient.db(DB_NAME);
Expand Down
24 changes: 12 additions & 12 deletions test/benchmarks/driver_bench/src/main.mts
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,27 @@ function calculateCompositeBenchmarks(results: MetricInfo[]) {
'largeDocBulkInsert',
'smallDocBulkInsert'
],
// parallelBench: [
// 'ldjsonMultiFileUpload',
// 'ldjsonMultiFileExport',
// 'gridfsMultiFileUpload',
// 'gridfsMultiFileDownload'
// ],
parallelBench: [
'ldjsonMultiFileUpload',
'ldjsonMultiFileExport',
'gridfsMultiFileUpload',
'gridfsMultiFileDownload'
],
readBench: [
'findOne',
'findManyAndEmptyCursor',
'gridFsDownload'
// 'gridfsMultiFileDownload',
// 'ldjsonMultiFileExport'
'gridFsDownload',
'gridfsMultiFileDownload',
'ldjsonMultiFileExport'
],
writeBench: [
'smallDocInsertOne',
'largeDocInsertOne',
'smallDocBulkInsert',
'largeDocBulkInsert',
'gridFsUpload'
// 'ldjsonMultiFileUpload',
// 'gridfsMultiFileUpload'
'gridFsUpload',
'ldjsonMultiFileUpload',
'gridfsMultiFileUpload'
]
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { createReadStream, createWriteStream, promises as fs } from 'node:fs';
import path from 'node:path';
import { pipeline } from 'node:stream/promises';

import { driver, type mongodb, PARALLEL_DIRECTORY, TEMP_DIRECTORY } from '../../driver.mjs';

export const taskSize = 262.144;

let bucket: mongodb.GridFSBucket;

export async function before() {
await driver.drop();
await driver.create();
await driver.resetTmpDir();

bucket = driver.bucket(driver.client.db(driver.DB_NAME));
await bucket.drop().catch(() => null);

const gridfs_multi = path.resolve(PARALLEL_DIRECTORY, 'gridfs_multi');

const files = (await fs.readdir(gridfs_multi)).map(filename =>
path.resolve(gridfs_multi, filename)
);

const uploadPromises = files.map(async filename => {
const fileStream = createReadStream(filename);
const uploadStream = bucket.openUploadStream(filename);
return await pipeline(fileStream, uploadStream);
});

await Promise.all(uploadPromises);
}

export async function beforeEach() {
await driver.resetTmpDir();
}

export async function run() {
const files = await bucket
.find()
.map(({ _id }) => ({
path: path.resolve(TEMP_DIRECTORY, `${_id}.txt`),
_id
}))
.toArray();

const downloads = files.map(async ({ _id, path }) => {
const fileStream = createWriteStream(path);
const downloadStream = bucket.openDownloadStream(_id);
return await pipeline(downloadStream, fileStream);
});

await Promise.all(downloads);
}

export async function afterEach() {
await driver.resetTmpDir();
}

export async function after() {
await driver.drop();
await driver.close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { createReadStream, promises as fs } from 'node:fs';
import path from 'node:path';
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';

import { driver, type mongodb, PARALLEL_DIRECTORY } from '../../driver.mjs';

export const taskSize = 262.144;

let bucket: mongodb.GridFSBucket;

const directory = path.resolve(PARALLEL_DIRECTORY, 'gridfs_multi');

export async function before() {
await driver.drop();
await driver.create();

bucket = driver.bucket(driver.client.db(driver.DB_NAME));

await bucket.drop().catch(() => null);
}

export async function beforeEach() {
// Create the bucket.
const stream = bucket.openUploadStream('setup-file.txt');
const oneByteFile = Readable.from('a');
await pipeline(oneByteFile, stream);
}

export async function run() {
const files = await fs.readdir(directory);

const uploadPromises = files.map(async filename => {
const file = path.resolve(directory, filename);
const fileStream = createReadStream(file);
const uploadStream = bucket.openUploadStream(file);
return await pipeline(fileStream, uploadStream);
});

await Promise.all(uploadPromises);
}

export async function after() {
await driver.drop();
await driver.close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { createReadStream, createWriteStream, promises as fs } from 'node:fs';
import path from 'node:path';
import readline from 'node:readline/promises';
import stream from 'node:stream/promises';

import { driver, EJSON, type mongodb, PARALLEL_DIRECTORY, TEMP_DIRECTORY } from '../../driver.mjs';

export const taskSize = 565;

let collection: mongodb.Collection;

export async function before() {
await driver.drop();
await driver.create();
await driver.resetTmpDir();

collection = driver.client.db(driver.DB_NAME).collection(driver.COLLECTION_NAME);

const ldjson_multi = path.resolve(PARALLEL_DIRECTORY, 'ldjson_multi');

const files = (await fs.readdir(ldjson_multi)).map(fileName =>
path.resolve(ldjson_multi, fileName)
);

const uploads = files.map(async fileName => {
const fileStream = createReadStream(fileName);
const lineReader = readline.createInterface({
input: fileStream
});

const operations = [];

for await (const line of lineReader) {
operations.push({
insertOne: {
document: JSON.parse(line)
}
});
}

fileStream.close();
lineReader.close();

return await collection.bulkWrite(operations);
});

await Promise.all(uploads);
}

export async function beforeEach() {
await driver.resetTmpDir();
}

export async function run() {
const skips = Array.from({ length: 100 }, (_, index) => index * 5000);

const promises = skips.map(async skip => {
const documentCursor = collection.find({}, { skip, limit: 5000 });
documentCursor.map(doc => EJSON.stringify(doc));
const outputStream = createWriteStream(path.resolve(TEMP_DIRECTORY, `tmp-${skip}.txt`));
return await stream.pipeline(documentCursor.stream(), outputStream);
});

await Promise.all(promises);
}

export async function afterEach() {
await driver.resetTmpDir();
}

export async function after() {
await driver.drop();
await driver.close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { createReadStream, promises as fs } from 'node:fs';
import path from 'node:path';
import readline from 'node:readline/promises';

import { driver, type mongodb, PARALLEL_DIRECTORY } from '../../driver.mjs';

export const taskSize = 565;

const directory = path.resolve(PARALLEL_DIRECTORY, 'ldjson_multi');
let collection: mongodb.Collection;

export async function beforeEach() {
await driver.drop();
await driver.create();

collection = driver.client.db(driver.DB_NAME).collection(driver.COLLECTION_NAME);
}

export async function run() {
const files = await fs.readdir(directory);
const uploads = files.map(async file => {
const fileStream = createReadStream(path.resolve(directory, file));
const lineReader = readline.createInterface({
input: fileStream
});

const operations = [];

for await (const line of lineReader) {
operations.push({
insertOne: {
document: JSON.parse(line)
}
});
}

fileStream.close();
lineReader.close();

return await collection.bulkWrite(operations);
});

await Promise.all(uploads);
}

export async function after() {
await driver.drop();
await driver.close();
}