From d99215265ffb42f21fe1f1b0fab1dd7bf74a98b5 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 15:24:50 +0100 Subject: [PATCH 1/7] fix: wait for flush to finish --- index.js | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index 8cd370c..944eb4d 100644 --- a/index.js +++ b/index.js @@ -212,10 +212,9 @@ class ThreadStream extends EventEmitter { throw new Error('the worker is ending') } - if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) { + if (this[kImpl].buf.length + data.length >= MAX_STRING) { try { writeSync(this) - this[kImpl].flushing = true } catch (err) { destroy(this, err) return false @@ -416,7 +415,11 @@ function writeSync (stream) { process.nextTick(drain, stream) } } - stream[kImpl].flushing = false + + if (stream[kImpl].flushing) { + // TODO (fix): What if flushing? Wait for flushing to finish? + stream[kImpl].flushing = false + } while (stream[kImpl].buf.length !== 0) { const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) From fbef0549e12b2b7d3b3fe70fb1ba46b3bc603df2 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 16:23:56 +0100 Subject: [PATCH 2/7] fixup --- index.js | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/index.js b/index.js index 944eb4d..22c4659 100644 --- a/index.js +++ b/index.js @@ -106,6 +106,11 @@ function nextFlush (stream) { Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) + if (stream[kImpl].buf.length === 0) { + nextFlush(stream) + return + } + // Find a toWrite length that fits the buffer // it must exists as the buffer is at least 4 bytes length // and the max utf-8 length for a char is 4 bytes. @@ -416,11 +421,6 @@ function writeSync (stream) { } } - if (stream[kImpl].flushing) { - // TODO (fix): What if flushing? Wait for flushing to finish? - stream[kImpl].flushing = false - } - while (stream[kImpl].buf.length !== 0) { const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) let leftover = stream[kImpl].data.length - writeIndex @@ -461,10 +461,6 @@ function writeSync (stream) { } function flushSync (stream) { - if (stream[kImpl].flushing) { - throw new Error('unable to flush while flushing') - } - // process._rawDebug('flushSync started') const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) From 8214cd115c99ac1b5c6f3b57595075638a5c4fd7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 16:32:38 +0100 Subject: [PATCH 3/7] fixup --- index.js | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/index.js b/index.js index 22c4659..b737a2b 100644 --- a/index.js +++ b/index.js @@ -261,22 +261,7 @@ class ThreadStream extends EventEmitter { throw new Error('the worker has exited') } - // TODO write all .buf - const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX) - // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) - wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { - if (err) { - destroy(this, err) - process.nextTick(cb, err) - return - } - if (res === 'not-equal') { - // TODO handle deadlock - this.flush(cb) - return - } - process.nextTick(cb) - }) + flushAsync(this, cb) } flushSync () { @@ -490,4 +475,23 @@ function flushSync (stream) { // process._rawDebug('flushSync finished') } +function flushAsync (stream, cb) { + // TODO write all .buf + const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) + // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) + wait(stream[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { + if (err) { + destroy(stream, err) + process.nextTick(cb, err) + return + } + if (res === 'not-equal') { + // TODO handle deadlock + flushAsync(stream, cb) + return + } + process.nextTick(cb) + }) +} + module.exports = ThreadStream From 79d5708f7454f6a5f02b9f17490013c21b44f802 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 16:33:48 +0100 Subject: [PATCH 4/7] fixup --- index.js | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/index.js b/index.js index b737a2b..ec9d1e6 100644 --- a/index.js +++ b/index.js @@ -270,7 +270,7 @@ class ThreadStream extends EventEmitter { } writeSync(this) - flushSync(this) + waitSync(this) } unref () { @@ -410,7 +410,7 @@ function writeSync (stream) { const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) let leftover = stream[kImpl].data.length - writeIndex if (leftover === 0) { - flushSync(stream) + waitSync(stream) Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) continue @@ -427,7 +427,7 @@ function writeSync (stream) { write(stream, toWrite, cb) } else { // multi-byte utf-8 - flushSync(stream) + waitSync(stream) Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) @@ -445,8 +445,8 @@ function writeSync (stream) { } } -function flushSync (stream) { - // process._rawDebug('flushSync started') +function waitSync (stream) { + // process._rawDebug('waitSync started') const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) @@ -457,10 +457,10 @@ function flushSync (stream) { const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) if (readIndex === -2) { - throw new Error('_flushSync failed') + throw new Error('waitSync failed') } - // process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`) + // process._rawDebug(`(waitSync) readIndex (${readIndex}) writeIndex (${writeIndex})`) if (readIndex !== writeIndex) { // TODO stream timeouts for some reason. Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000) @@ -469,10 +469,10 @@ function flushSync (stream) { } if (++spins === 10) { - throw new Error('_flushSync took too long (10s)') + throw new Error('waitSync took too long (10s)') } } - // process._rawDebug('flushSync finished') + // process._rawDebug('waitSync finished') } function flushAsync (stream, cb) { From 626d5a96cad471aa3179ac7a66b9d330994b8801 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 16:37:44 +0100 Subject: [PATCH 5/7] fixup --- index.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/index.js b/index.js index ec9d1e6..732362e 100644 --- a/index.js +++ b/index.js @@ -261,7 +261,8 @@ class ThreadStream extends EventEmitter { throw new Error('the worker has exited') } - flushAsync(this, cb) + writeSync(this) // TODO (fix): Make this async somehow. + waitAsync(this, cb) } flushSync () { @@ -475,7 +476,7 @@ function waitSync (stream) { // process._rawDebug('waitSync finished') } -function flushAsync (stream, cb) { +function waitAsync (stream, cb) { // TODO write all .buf const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) @@ -487,7 +488,7 @@ function flushAsync (stream, cb) { } if (res === 'not-equal') { // TODO handle deadlock - flushAsync(stream, cb) + waitAsync(stream, cb) return } process.nextTick(cb) From ccd7c2279eaec750d72da2fd2597ca81af6f6e81 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 16:38:59 +0100 Subject: [PATCH 6/7] fixup --- index.js | 1 - 1 file changed, 1 deletion(-) diff --git a/index.js b/index.js index 732362e..b29829c 100644 --- a/index.js +++ b/index.js @@ -477,7 +477,6 @@ function waitSync (stream) { } function waitAsync (stream, cb) { - // TODO write all .buf const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) wait(stream[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { From a39b23e95e20bf279690aa9718f8b1e3a82c76ab Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 10 Dec 2021 19:36:35 +0100 Subject: [PATCH 7/7] fixup --- test/string-limit-2.test.js | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 test/string-limit-2.test.js diff --git a/test/string-limit-2.test.js b/test/string-limit-2.test.js new file mode 100644 index 0000000..dc2e959 --- /dev/null +++ b/test/string-limit-2.test.js @@ -0,0 +1,31 @@ +'use strict' + +const t = require('tap') +const { join } = require('path') +const { file } = require('./helper') +const { createReadStream } = require('fs') +const ThreadStream = require('..') +const buffer = require('buffer') + +const MAX_STRING = buffer.constants.MAX_STRING_LENGTH + +t.plan(1) + +const dest = file() +const stream = new ThreadStream({ + filename: join(__dirname, 'to-file.js'), + workerData: { dest }, + sync: false +}) + +stream.on('close', async () => { + let buf + for await (const chunk of createReadStream(dest)) { + buf = chunk + } + t.equal('asd', buf.toString().slice(-3)) +}) + +stream.write('a'.repeat(MAX_STRING - 2)) +stream.write('asd') +stream.end()