From d70f6c74876bf0812c2f34880406add4dc784618 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:52:46 +0000 Subject: [PATCH 1/3] bench: add websockets --- benchmarks/_util/index.js | 18 +- benchmarks/_util/runner.js | 139 +++++++++++++++ benchmarks/package.json | 3 +- benchmarks/websocket-benchmark.mjs | 206 ++++++++++++++++++++++ benchmarks/websocket-echo-server.mjs | 46 +++++ benchmarks/websocket/websocket-buffer.mjs | 61 +++++++ benchmarks/websocket/websocket-string.mjs | 61 +++++++ 7 files changed, 532 insertions(+), 2 deletions(-) create mode 100644 benchmarks/_util/runner.js create mode 100644 benchmarks/websocket-benchmark.mjs create mode 100644 benchmarks/websocket-echo-server.mjs create mode 100644 benchmarks/websocket/websocket-buffer.mjs create mode 100644 benchmarks/websocket/websocket-string.mjs diff --git a/benchmarks/_util/index.js b/benchmarks/_util/index.js index 75f903530ca..4b14d843c35 100644 --- a/benchmarks/_util/index.js +++ b/benchmarks/_util/index.js @@ -50,4 +50,20 @@ function printResults (results) { return console.table(rows) } -module.exports = { makeParallelRequests, printResults } +/** + * @param {number} num + * @returns {string} + */ +function formatBytes (num) { + if (!Number.isFinite(num)) { + throw new Error('invalid number') + } + + const prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB'] + + const idx = Math.min(Math.floor(Math.log(num) / Math.log(1024)), prefixes.length - 1) + + return `${(num / Math.pow(1024, idx)).toFixed(2)}${prefixes[idx]}` +} + +module.exports = { makeParallelRequests, printResults, formatBytes } diff --git a/benchmarks/_util/runner.js b/benchmarks/_util/runner.js new file mode 100644 index 00000000000..551ffd3ba83 --- /dev/null +++ b/benchmarks/_util/runner.js @@ -0,0 +1,139 @@ +// @ts-check + +'use strict' + +class Info { + /** @type {string} */ + #name + /** @type {bigint} */ + #current + /** @type {bigint} */ + #finish + /** @type {(...args: any[]) => any} */ + #callback + /** @type {boolean} */ + #finalized = false + + /** + * @param {string} name + * @param {(...args: any[]) => any} callback + */ + constructor (name, callback) { + this.#name = name + this.#callback = callback + } + + get name () { + return this.#name + } + + start () { + if (this.#finalized) { + throw new TypeError('called after finished.') + } + this.#current = process.hrtime.bigint() + } + + end () { + if (this.#finalized) { + throw new TypeError('called after finished.') + } + this.#finish = process.hrtime.bigint() + this.#finalized = true + this.#callback() + } + + diff () { + return Number(this.#finish - this.#current) + } +} + +/** + * @typedef BenchMarkHandler + * @type {(ev: { name: string; start(): void; end(): void; }) => any} + */ + +/** + * @param {Record} experiments + * @param {{}} [options] + * @returns {Promise<{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]>} + */ +async function bench (experiments, options = {}) { + const names = Object.keys(experiments) + + /** @type {{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]} */ + const results = [] + + async function waitMaybePromiseLike (p) { + if ( + (typeof p === 'object' || typeof p === 'function') && + p !== null && + typeof p.then === 'function' + ) { + await p + } + } + + for (let i = 0; i < names.length; ++i) { + const name = names[i] + const fn = experiments[name] + const samples = [] + + for (let i = 0; i < 8; ++i) { + // warmup + await new Promise((resolve, reject) => { + const info = new Info(name, resolve) + + try { + const p = fn(info) + + waitMaybePromiseLike(p).catch((err) => reject(err)) + } catch (err) { + reject(err) + } + }) + } + + let timing = 0 + + for (let j = 0; j < 128 || timing < 800_000_000; ++j) { + let resolve = (value) => {} + let reject = (reason) => {} + const promise = new Promise( + (_resolve, _reject) => { resolve = _resolve; reject = _reject } + ) + + const info = new Info(name, resolve) + + try { + const p = fn(info) + + await waitMaybePromiseLike(p) + } catch (err) { + reject(err) + } + + await promise + + samples.push({ time: info.diff() }) + + timing += info.diff() + } + + const average = + samples.map((v) => v.time).reduce((a, b) => a + b, 0) / samples.length + + results.push({ + name: names[i], + average, + samples: samples.length, + fn, + min: samples.reduce((a, acc) => Math.min(a, acc.time), samples[0].time), + max: samples.reduce((a, acc) => Math.max(a, acc.time), samples[0].time) + }) + } + + return results +} + +module.exports = { bench } diff --git a/benchmarks/package.json b/benchmarks/package.json index c864c56b3ea..2e65101df6f 100644 --- a/benchmarks/package.json +++ b/benchmarks/package.json @@ -21,6 +21,7 @@ "node-fetch": "^3.3.2", "request": "^2.88.2", "superagent": "^10.0.0", - "wait-on": "^8.0.0" + "wait-on": "^8.0.0", + "uWebSockets.js": "uNetworking/uWebSockets.js#v20.49.0" } } diff --git a/benchmarks/websocket-benchmark.mjs b/benchmarks/websocket-benchmark.mjs new file mode 100644 index 00000000000..ea3526d65e7 --- /dev/null +++ b/benchmarks/websocket-benchmark.mjs @@ -0,0 +1,206 @@ +// @ts-check + +import { bench } from './_util/runner.js' +import { formatBytes } from './_util/index.js' +import { WebSocket, WebSocketStream } from '../index.js' +import { WebSocket as WsWebSocket } from 'ws' + +/** + * @type {Record import('./_util/runner.js').BenchMarkHandler; connect: (url: string) => Promise; binaries: (string | Uint8Array)[] }>} + */ +const experiments = {} +/** + * @type {Record} + */ +const experimentsInfo = {} + +/** + * @type {any[]} + */ +const connections = [] + +const binary = Buffer.alloc(1024 * 256, '_') +const binaries = [binary, binary.toString('utf-8')] + +experiments['undici'] = { + fn: (ws, binary) => { + if (!(ws instanceof WebSocket)) { + throw new Error("'undici' websocket are expected.") + } + + return (ev) => { + ws.addEventListener( + 'message', + () => { + ev.end() + }, + { once: true } + ) + + ev.start() + ws.send(binary) + } + }, + + connect: async (url) => { + const ws = new WebSocket(url) + + await /** @type {Promise} */ ( + new Promise((resolve, reject) => { + function onOpen () { + resolve() + ws.removeEventListener('open', onOpen) + ws.removeEventListener('error', onError) + } + function onError (err) { + reject(err) + ws.removeEventListener('open', onOpen) + ws.removeEventListener('error', onError) + } + ws.addEventListener('open', onOpen) + ws.addEventListener('error', onError) + }) + ) + + // avoid create blob + ws.binaryType = 'arraybuffer' + + return ws + }, + + binaries +} + +experiments['undici - stream'] = { + fn: (ws, binary) => { + /** @type {ReadableStreamDefaultReader} */ + const reader = ws.reader + /** @type {WritableStreamDefaultWriter} */ + const writer = ws.writer + + return async (ev) => { + ev.start() + await writer.write(binary) + await reader.read() + ev.end() + } + }, + + connect: async (url) => { + const ws = new WebSocketStream(url) + + const { readable, writable } = await ws.opened + const reader = readable.getReader() + const writer = writable.getWriter() + + // @ts-ignore + return { reader, writer, close: () => ws.close() } + }, + + binaries +} + +experiments['ws'] = { + fn: (ws, binary) => { + if (!(ws instanceof WsWebSocket)) { + throw new Error("'ws' websocket are expected.") + } + + return (ev) => { + ws.once('message', () => { + ev.end() + }) + ev.start() + ws.send(binary) + } + }, + + connect: async (url) => { + const ws = new WsWebSocket(url) + + await /** @type {Promise} */ ( + new Promise((resolve, reject) => { + function onOpen () { + resolve() + ws.off('open', onOpen) + ws.off('error', onError) + } + function onError (err) { + reject(err) + ws.off('open', onOpen) + ws.off('error', onError) + } + ws.on('open', onOpen) + ws.on('error', onError) + }) + ) + + ws.binaryType = 'arraybuffer' + + return ws + }, + + binaries +} + +async function init () { + /** @type {Record} */ + const round = {} + + const keys = Object.keys(experiments) + + for (let i = 0; i < keys.length; ++i) { + const name = keys[i] + + const { fn, connect, binaries } = experiments[name] + + const ws = await connect('ws://localhost:5001') + + const needShowBytes = binaries.length !== 2 || typeof binaries[0] === typeof binaries[1] + for (let i = 0; i < binaries.length; ++i) { + const binary = binaries[i] + const bytes = Buffer.byteLength(binary) + + const binaryType = typeof binary === 'string' ? 'string' : 'binary' + const roundName = needShowBytes + ? `${name} [${formatBytes(bytes)} (${binaryType})]` + : `${name} [${binaryType}]` + + round[roundName] = fn(ws, binary) + experimentsInfo[roundName] = { bytes, binaryType } + } + + connections.push(ws) + } + + return round +} + +init() + .then((round) => bench(round, {})) + .then((results) => { + print(results) + + for (const ws of connections) { + ws.close() + } + }, (err) => { + process.nextTick((err) => { + throw err + }, err) + }) + +/** + * @param {{ name: string; average: number; }[]} results + */ +function print (results) { + for (const { name, average } of results) { + const { bytes } = experimentsInfo[name] + + console.log( + `${name}: transferred ${formatBytes((bytes / average) * 1e9)}/s` + ) + } +} + +export {} diff --git a/benchmarks/websocket-echo-server.mjs b/benchmarks/websocket-echo-server.mjs new file mode 100644 index 00000000000..c4267ce05af --- /dev/null +++ b/benchmarks/websocket-echo-server.mjs @@ -0,0 +1,46 @@ +import { Worker, isMainThread, parentPort, threadId } from 'node:worker_threads' +import { cpus } from 'node:os' +import url from 'node:url' +import uws from 'uWebSockets.js' + +const __filename = url.fileURLToPath(import.meta.url) + +const app = uws.App() + +if (isMainThread) { + for (let i = cpus().length - 1; i >= 0; --i) { + new Worker(__filename).on('message', (workerAppDescriptor) => { + app.addChildAppDescriptor(workerAppDescriptor) + }) + } +} else { + app + .ws('/*', { + compression: uws.DISABLED, + maxPayloadLength: 512 * 1024 * 1024, + maxBackpressure: 128 * 1024, + idleTimeout: 60, + message: (ws, message, isBinary) => { + /* Here we echo the message back, using compression if available */ + const ok = ws.send(message, isBinary) // eslint-disable-line + } + }) + .get('/*', (res, req) => { + /* It does Http as well */ + res + .writeStatus('200 OK') + .end('Hello there!') + }) + + parentPort.postMessage(app.getDescriptor()) +} + +app.listen(5001, (listenSocket) => { + if (listenSocket) { + if (threadId === 0) { + console.log('Listening to port 5001') + } else { + console.log(`Listening to port 5001 from thread ${threadId}`) + } + } +}) diff --git a/benchmarks/websocket/websocket-buffer.mjs b/benchmarks/websocket/websocket-buffer.mjs new file mode 100644 index 00000000000..08ca5944078 --- /dev/null +++ b/benchmarks/websocket/websocket-buffer.mjs @@ -0,0 +1,61 @@ +import { WebSocket as WsWebSocket } from 'ws' +import { WebSocket as UndiciWebSocket } from '../../index.js' +import { bench, run, group } from 'mitata' + +const __BINARY_SIZE__ = 1024 * 256 + +const binary = Buffer.alloc(__BINARY_SIZE__, '_') + +const url = 'http://localhost:5001' + +const connections = [] + +group('send', () => { + { + const ws = new WsWebSocket(url) + let _resolve + ws.on('message', () => { + _resolve() + }) + bench('ws', () => { + return new Promise((resolve, reject) => { + ws.send(binary) + _resolve = resolve + }) + }) + connections.push(ws) + } + { + const ws = new UndiciWebSocket(url) + let _resolve + ws.addEventListener('message', () => { + _resolve() + }) + bench('undici', () => { + return new Promise((resolve, reject) => { + ws.send(binary) + _resolve = resolve + }) + }) + connections.push(ws) + } +}) + +for (const ws of connections) { + // for fairness + ws.binaryType = 'arraybuffer' + await new Promise((resolve, reject) => { + ws.addEventListener('open', () => { + resolve() + }) + ws.addEventListener('error', (err) => { + reject(err) + }) + }) +} + +await run() + +for (const ws of connections) { + ws.close() +} diff --git a/benchmarks/websocket/websocket-string.mjs b/benchmarks/websocket/websocket-string.mjs new file mode 100644 index 00000000000..a4bae41e3d3 --- /dev/null +++ b/benchmarks/websocket/websocket-string.mjs @@ -0,0 +1,61 @@ +import { WebSocket as WsWebSocket } from 'ws' +import { WebSocket as UndiciWebSocket } from '../../index.js' +import { bench, run, group } from 'mitata' + +const __BINARY_SIZE__ = 1024 * 256 + +const binary = Buffer.alloc(__BINARY_SIZE__, '_').toString('utf-8') + +const url = 'http://localhost:5001' + +const connections = [] + +group('send', () => { + { + const ws = new WsWebSocket(url) + let _resolve + ws.on('message', () => { + _resolve() + }) + bench('ws', () => { + return new Promise((resolve, reject) => { + ws.send(binary) + _resolve = resolve + }) + }) + connections.push(ws) + } + { + const ws = new UndiciWebSocket(url) + let _resolve + ws.addEventListener('message', () => { + _resolve() + }) + bench('undici', () => { + return new Promise((resolve, reject) => { + ws.send(binary) + _resolve = resolve + }) + }) + connections.push(ws) + } +}) + +for (const ws of connections) { + // for fairness + ws.binaryType = 'arraybuffer' + await new Promise((resolve, reject) => { + ws.addEventListener('open', () => { + resolve() + }) + ws.addEventListener('error', (err) => { + reject(err) + }) + }) +} + +await run() + +for (const ws of connections) { + ws.close() +} From 118ca6f05906c07b7d8c79daa9b216e1061ade63 Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Wed, 2 Oct 2024 10:45:10 +0000 Subject: [PATCH 2/3] increase min samples --- benchmarks/_util/runner.js | 5 +++-- benchmarks/websocket-benchmark.mjs | 6 ++++-- benchmarks/websocket/generate-mask.mjs | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/benchmarks/_util/runner.js b/benchmarks/_util/runner.js index 551ffd3ba83..ac35dbd3baf 100644 --- a/benchmarks/_util/runner.js +++ b/benchmarks/_util/runner.js @@ -55,7 +55,7 @@ class Info { /** * @param {Record} experiments - * @param {{}} [options] + * @param {{ minSamples?: number }} [options] * @returns {Promise<{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]>} */ async function bench (experiments, options = {}) { @@ -95,8 +95,9 @@ async function bench (experiments, options = {}) { } let timing = 0 + const minSamples = options.minSamples ?? 128 - for (let j = 0; j < 128 || timing < 800_000_000; ++j) { + for (let j = 0; j < minSamples || timing < 800_000_000; ++j) { let resolve = (value) => {} let reject = (reason) => {} const promise = new Promise( diff --git a/benchmarks/websocket-benchmark.mjs b/benchmarks/websocket-benchmark.mjs index ea3526d65e7..6ffff3cc003 100644 --- a/benchmarks/websocket-benchmark.mjs +++ b/benchmarks/websocket-benchmark.mjs @@ -177,7 +177,9 @@ async function init () { } init() - .then((round) => bench(round, {})) + .then((round) => bench(round, { + minSamples: 512 + })) .then((results) => { print(results) @@ -198,7 +200,7 @@ function print (results) { const { bytes } = experimentsInfo[name] console.log( - `${name}: transferred ${formatBytes((bytes / average) * 1e9)}/s` + `${name}: transferred ${formatBytes((bytes / average) * 1e9)} Bytes/s` ) } } diff --git a/benchmarks/websocket/generate-mask.mjs b/benchmarks/websocket/generate-mask.mjs index 032f05d8b99..8562b828b28 100644 --- a/benchmarks/websocket/generate-mask.mjs +++ b/benchmarks/websocket/generate-mask.mjs @@ -1,7 +1,7 @@ import { randomFillSync, randomBytes } from 'node:crypto' import { bench, group, run } from 'mitata' -const BUFFER_SIZE = 16384 +const BUFFER_SIZE = 8 * 1024 const buf = Buffer.allocUnsafe(BUFFER_SIZE) let bufIdx = BUFFER_SIZE From 3bb9ada4b72199893065b245265e555823b7be6b Mon Sep 17 00:00:00 2001 From: tsctx <91457664+tsctx@users.noreply.github.com> Date: Sat, 5 Oct 2024 12:41:49 +0000 Subject: [PATCH 3/3] fix mitata benchmarks --- benchmarks/websocket/websocket-buffer.mjs | 40 +++++++++++++++-------- benchmarks/websocket/websocket-string.mjs | 40 +++++++++++++++-------- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/benchmarks/websocket/websocket-buffer.mjs b/benchmarks/websocket/websocket-buffer.mjs index 08ca5944078..c012a7a9e54 100644 --- a/benchmarks/websocket/websocket-buffer.mjs +++ b/benchmarks/websocket/websocket-buffer.mjs @@ -1,8 +1,8 @@ import { WebSocket as WsWebSocket } from 'ws' import { WebSocket as UndiciWebSocket } from '../../index.js' -import { bench, run, group } from 'mitata' +import { bench, run, lineplot } from 'mitata' -const __BINARY_SIZE__ = 1024 * 256 +const __BINARY_SIZE__ = 1024 * 512 const binary = Buffer.alloc(__BINARY_SIZE__, '_') @@ -10,19 +10,26 @@ const url = 'http://localhost:5001' const connections = [] -group('send', () => { +lineplot(() => { { const ws = new WsWebSocket(url) let _resolve ws.on('message', () => { _resolve() }) - bench('ws', () => { - return new Promise((resolve, reject) => { - ws.send(binary) - _resolve = resolve + bench('ws ($messages)', function * (state) { + const messages = state.get('messages') + const chunk = binary.subarray(0, __BINARY_SIZE__ / messages) + yield () => new Promise((resolve, reject) => { + for (let i = 0; i < messages; ++i) ws.send(chunk) + let id = 0 + _resolve = () => { + if (++id === messages) { + resolve() + } + } }) - }) + }).range('messages', 1, 256) connections.push(ws) } { @@ -31,12 +38,19 @@ group('send', () => { ws.addEventListener('message', () => { _resolve() }) - bench('undici', () => { - return new Promise((resolve, reject) => { - ws.send(binary) - _resolve = resolve + bench('undici ($messages)', function * (state) { + const messages = state.get('messages') + const chunk = binary.subarray(0, __BINARY_SIZE__ / messages) + yield () => new Promise((resolve, reject) => { + for (let i = 0; i < messages; ++i) ws.send(chunk) + let id = 0 + _resolve = () => { + if (++id === messages) { + resolve() + } + } }) - }) + }).range('messages', 1, 256) connections.push(ws) } }) diff --git a/benchmarks/websocket/websocket-string.mjs b/benchmarks/websocket/websocket-string.mjs index a4bae41e3d3..8dc251637da 100644 --- a/benchmarks/websocket/websocket-string.mjs +++ b/benchmarks/websocket/websocket-string.mjs @@ -1,8 +1,8 @@ import { WebSocket as WsWebSocket } from 'ws' import { WebSocket as UndiciWebSocket } from '../../index.js' -import { bench, run, group } from 'mitata' +import { bench, run, lineplot } from 'mitata' -const __BINARY_SIZE__ = 1024 * 256 +const __BINARY_SIZE__ = 1024 * 512 const binary = Buffer.alloc(__BINARY_SIZE__, '_').toString('utf-8') @@ -10,19 +10,26 @@ const url = 'http://localhost:5001' const connections = [] -group('send', () => { +lineplot(() => { { const ws = new WsWebSocket(url) let _resolve ws.on('message', () => { _resolve() }) - bench('ws', () => { - return new Promise((resolve, reject) => { - ws.send(binary) - _resolve = resolve + bench('ws ($messages)', function * (state) { + const messages = state.get('messages') + const chunk = binary.subarray(0, __BINARY_SIZE__ / messages) + yield () => new Promise((resolve, reject) => { + for (let i = 0; i < messages; ++i) ws.send(chunk) + let id = 0 + _resolve = () => { + if (++id === messages) { + resolve() + } + } }) - }) + }).range('messages', 1, 256) connections.push(ws) } { @@ -31,12 +38,19 @@ group('send', () => { ws.addEventListener('message', () => { _resolve() }) - bench('undici', () => { - return new Promise((resolve, reject) => { - ws.send(binary) - _resolve = resolve + bench('undici ($messages)', function * (state) { + const messages = state.get('messages') + const chunk = binary.subarray(0, __BINARY_SIZE__ / messages) + yield () => new Promise((resolve, reject) => { + for (let i = 0; i < messages; ++i) ws.send(chunk) + let id = 0 + _resolve = () => { + if (++id === messages) { + resolve() + } + } }) - }) + }).range('messages', 1, 256) connections.push(ws) } })