Skip to content

Commit 6a61748

Browse files
committed
chore(NODE-6705): migrate parallel benchmarks
1 parent da1c05c commit 6a61748

12 files changed

+258
-15
lines changed

test/benchmarks/driver_bench/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"version": "0.0.0",
44
"private": true,
55
"scripts": {
6-
"prestart": "tsc",
6+
"prestart": "rm -rf lib && tsc",
77
"start": "node lib/main.mjs"
88
}
99
}

test/benchmarks/driver_bench/src/driver.mts

+13-2
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,18 @@ export function snakeToCamel(name: string) {
105105
import type mongodb from '../../../../mongodb.js';
106106
export type { mongodb };
107107

108-
const { MongoClient, GridFSBucket } = require(path.join(MONGODB_DRIVER_PATH));
108+
const { MongoClient, GridFSBucket, BSON } = require(path.join(MONGODB_DRIVER_PATH));
109+
110+
export { BSON };
111+
export const EJSON = BSON.EJSON;
109112

110113
const DB_NAME = 'perftest';
111114
const COLLECTION_NAME = 'corpus';
112115

113-
const SPEC_DIRECTORY = path.resolve(__dirname, '..', '..', 'driverBench', 'spec');
116+
// TODO(NODE-6729): move spec into this folder: export const SPEC_DIRECTORY = path.resolve(__dirname, '..', 'spec');
117+
export const SPEC_DIRECTORY = path.resolve(__dirname, '..', '..', 'driverBench', 'spec');
118+
export const PARALLEL_DIRECTORY = path.resolve(SPEC_DIRECTORY, 'parallel');
119+
export const TEMP_DIRECTORY = path.resolve(SPEC_DIRECTORY, 'tmp');
114120

115121
export type Metric = {
116122
name: 'megabytes_per_second';
@@ -186,6 +192,11 @@ export class DriverTester {
186192
throw new Error('unknown type: ' + type);
187193
}
188194

195+
async resetTmpDir() {
196+
await fs.rm(TEMP_DIRECTORY, { recursive: true, force: true });
197+
await fs.mkdir(TEMP_DIRECTORY);
198+
}
199+
189200
async insertManyOf(document: Record<string, any>, length: number, addId = false) {
190201
const utilClient = new MongoClient(MONGODB_URI, MONGODB_CLIENT_OPTIONS);
191202
const db = utilClient.db(DB_NAME);

test/benchmarks/driver_bench/src/main.mts

+12-12
Original file line numberDiff line numberDiff line change
@@ -120,27 +120,27 @@ function calculateCompositeBenchmarks(results: MetricInfo[]) {
120120
'largeDocBulkInsert',
121121
'smallDocBulkInsert'
122122
],
123-
// parallelBench: [
124-
// 'ldjsonMultiFileUpload',
125-
// 'ldjsonMultiFileExport',
126-
// 'gridfsMultiFileUpload',
127-
// 'gridfsMultiFileDownload'
128-
// ],
123+
parallelBench: [
124+
'ldjsonMultiFileUpload',
125+
'ldjsonMultiFileExport',
126+
'gridfsMultiFileUpload',
127+
'gridfsMultiFileDownload'
128+
],
129129
readBench: [
130130
'findOne',
131131
'findManyAndEmptyCursor',
132-
'gridFsDownload'
133-
// 'gridfsMultiFileDownload',
134-
// 'ldjsonMultiFileExport'
132+
'gridFsDownload',
133+
'gridfsMultiFileDownload',
134+
'ldjsonMultiFileExport'
135135
],
136136
writeBench: [
137137
'smallDocInsertOne',
138138
'largeDocInsertOne',
139139
'smallDocBulkInsert',
140140
'largeDocBulkInsert',
141-
'gridFsUpload'
142-
// 'ldjsonMultiFileUpload',
143-
// 'gridfsMultiFileUpload'
141+
'gridFsUpload',
142+
'ldjsonMultiFileUpload',
143+
'gridfsMultiFileUpload'
144144
]
145145
};
146146

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { createReadStream, createWriteStream, promises as fs } from 'node:fs';
2+
import path from 'node:path';
3+
import { pipeline } from 'node:stream/promises';
4+
5+
import { driver, type mongodb, PARALLEL_DIRECTORY, TEMP_DIRECTORY } from '../../driver.mjs';
6+
7+
export const taskSize = 262.144;
8+
9+
let bucket: mongodb.GridFSBucket;
10+
11+
export async function before() {
12+
await driver.drop();
13+
await driver.create();
14+
await driver.resetTmpDir();
15+
16+
bucket = driver.bucket(driver.client.db(driver.DB_NAME));
17+
await bucket.drop().catch(() => null);
18+
19+
const gridfs_multi = path.resolve(PARALLEL_DIRECTORY, 'gridfs_multi');
20+
21+
const files = (await fs.readdir(gridfs_multi)).map(filename =>
22+
path.resolve(gridfs_multi, filename)
23+
);
24+
25+
const uploadPromises = files.map(async filename => {
26+
const fileStream = createReadStream(filename);
27+
const uploadStream = bucket.openUploadStream(filename);
28+
return await pipeline(fileStream, uploadStream);
29+
});
30+
31+
await Promise.all(uploadPromises);
32+
}
33+
34+
export async function beforeEach() {
35+
await driver.resetTmpDir();
36+
}
37+
38+
export async function run() {
39+
const files = await bucket
40+
.find()
41+
.map(({ _id }) => ({
42+
path: path.resolve(TEMP_DIRECTORY, `${_id}.txt`),
43+
_id
44+
}))
45+
.toArray();
46+
47+
const downloads = files.map(async ({ _id, path }) => {
48+
const fileStream = createWriteStream(path);
49+
const downloadStream = bucket.openDownloadStream(_id);
50+
return await pipeline(downloadStream, fileStream);
51+
});
52+
53+
await Promise.all(downloads);
54+
}
55+
56+
export async function afterEach() {
57+
await driver.resetTmpDir();
58+
}
59+
60+
export async function after() {
61+
await driver.drop();
62+
await driver.close();
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { createReadStream, promises as fs } from 'node:fs';
2+
import path from 'node:path';
3+
import { Readable } from 'node:stream';
4+
import { pipeline } from 'node:stream/promises';
5+
6+
import { driver, type mongodb, PARALLEL_DIRECTORY } from '../../driver.mjs';
7+
8+
export const taskSize = 262.144;
9+
10+
let bucket: mongodb.GridFSBucket;
11+
12+
const directory = path.resolve(PARALLEL_DIRECTORY, 'gridfs_multi');
13+
14+
export async function before() {
15+
await driver.drop();
16+
await driver.create();
17+
18+
bucket = driver.bucket(driver.client.db(driver.DB_NAME));
19+
20+
await bucket.drop().catch(() => null);
21+
}
22+
23+
export async function beforeEach() {
24+
// Create the bucket.
25+
const stream = bucket.openUploadStream('setup-file.txt');
26+
const oneByteFile = Readable.from('a');
27+
await pipeline(oneByteFile, stream);
28+
}
29+
30+
export async function run() {
31+
const files = await fs.readdir(directory);
32+
33+
const uploadPromises = files.map(async filename => {
34+
const file = path.resolve(directory, filename);
35+
const fileStream = createReadStream(file);
36+
const uploadStream = bucket.openUploadStream(file);
37+
return await pipeline(fileStream, uploadStream);
38+
});
39+
40+
await Promise.all(uploadPromises);
41+
}
42+
43+
export async function after() {
44+
await driver.drop();
45+
await driver.close();
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import { createReadStream, createWriteStream, promises as fs } from 'node:fs';
2+
import path from 'node:path';
3+
import readline from 'node:readline/promises';
4+
import stream from 'node:stream/promises';
5+
6+
import { driver, EJSON, type mongodb, PARALLEL_DIRECTORY, TEMP_DIRECTORY } from '../../driver.mjs';
7+
8+
export const taskSize = 565;
9+
10+
let collection: mongodb.Collection;
11+
12+
export async function before() {
13+
await driver.drop();
14+
await driver.create();
15+
await driver.resetTmpDir();
16+
17+
collection = driver.client.db(driver.DB_NAME).collection(driver.COLLECTION_NAME);
18+
19+
const ldjson_multi = path.resolve(PARALLEL_DIRECTORY, 'ldjson_multi');
20+
21+
const files = (await fs.readdir(ldjson_multi)).map(fileName =>
22+
path.resolve(ldjson_multi, fileName)
23+
);
24+
25+
const uploads = files.map(async fileName => {
26+
const fileStream = createReadStream(fileName);
27+
const lineReader = readline.createInterface({
28+
input: fileStream
29+
});
30+
31+
const operations = [];
32+
33+
for await (const line of lineReader) {
34+
operations.push({
35+
insertOne: {
36+
document: JSON.parse(line)
37+
}
38+
});
39+
}
40+
41+
fileStream.close();
42+
lineReader.close();
43+
44+
return await collection.bulkWrite(operations);
45+
});
46+
47+
await Promise.all(uploads);
48+
}
49+
50+
export async function beforeEach() {
51+
await driver.resetTmpDir();
52+
}
53+
54+
export async function run() {
55+
const skips = Array.from({ length: 100 }, (_, index) => index * 5000);
56+
57+
const promises = skips.map(async skip => {
58+
const documentCursor = collection.find({}, { skip, limit: 5000 });
59+
documentCursor.map(doc => EJSON.stringify(doc));
60+
const outputStream = createWriteStream(path.resolve(TEMP_DIRECTORY, `tmp-${skip}.txt`));
61+
return await stream.pipeline(documentCursor.stream(), outputStream);
62+
});
63+
64+
await Promise.all(promises);
65+
}
66+
67+
export async function afterEach() {
68+
await driver.resetTmpDir();
69+
}
70+
71+
export async function after() {
72+
await driver.drop();
73+
await driver.close();
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import { createReadStream, promises as fs } from 'node:fs';
2+
import path from 'node:path';
3+
import readline from 'node:readline/promises';
4+
5+
import { driver, type mongodb, PARALLEL_DIRECTORY } from '../../driver.mjs';
6+
7+
export const taskSize = 565;
8+
9+
const directory = path.resolve(PARALLEL_DIRECTORY, 'ldjson_multi');
10+
let collection: mongodb.Collection;
11+
12+
export async function beforeEach() {
13+
await driver.drop();
14+
await driver.create();
15+
16+
collection = driver.client.db(driver.DB_NAME).collection(driver.COLLECTION_NAME);
17+
}
18+
19+
export async function run() {
20+
const files = await fs.readdir(directory);
21+
const uploads = files.map(async file => {
22+
const fileStream = createReadStream(path.resolve(directory, file));
23+
const lineReader = readline.createInterface({
24+
input: fileStream
25+
});
26+
27+
const operations = [];
28+
29+
for await (const line of lineReader) {
30+
operations.push({
31+
insertOne: {
32+
document: JSON.parse(line)
33+
}
34+
});
35+
}
36+
37+
fileStream.close();
38+
lineReader.close();
39+
40+
return await collection.bulkWrite(operations);
41+
});
42+
43+
await Promise.all(uploads);
44+
}
45+
46+
export async function after() {
47+
await driver.drop();
48+
await driver.close();
49+
}

0 commit comments

Comments
 (0)