diff --git a/index.js b/index.js index 8cd370c..b29829c 100644 --- a/index.js +++ b/index.js @@ -106,6 +106,11 @@ function nextFlush (stream) { Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) + if (stream[kImpl].buf.length === 0) { + nextFlush(stream) + return + } + // Find a toWrite length that fits the buffer // it must exists as the buffer is at least 4 bytes length // and the max utf-8 length for a char is 4 bytes. @@ -212,10 +217,9 @@ class ThreadStream extends EventEmitter { throw new Error('the worker is ending') } - if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) { + if (this[kImpl].buf.length + data.length >= MAX_STRING) { try { writeSync(this) - this[kImpl].flushing = true } catch (err) { destroy(this, err) return false @@ -257,22 +261,8 @@ class ThreadStream extends EventEmitter { throw new Error('the worker has exited') } - // TODO write all .buf - const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX) - // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) - wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { - if (err) { - destroy(this, err) - process.nextTick(cb, err) - return - } - if (res === 'not-equal') { - // TODO handle deadlock - this.flush(cb) - return - } - process.nextTick(cb) - }) + writeSync(this) // TODO (fix): Make this async somehow. + waitAsync(this, cb) } flushSync () { @@ -281,7 +271,7 @@ class ThreadStream extends EventEmitter { } writeSync(this) - flushSync(this) + waitSync(this) } unref () { @@ -416,13 +406,12 @@ function writeSync (stream) { process.nextTick(drain, stream) } } - stream[kImpl].flushing = false while (stream[kImpl].buf.length !== 0) { const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) let leftover = stream[kImpl].data.length - writeIndex if (leftover === 0) { - flushSync(stream) + waitSync(stream) Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) continue @@ -439,7 +428,7 @@ function writeSync (stream) { write(stream, toWrite, cb) } else { // multi-byte utf-8 - flushSync(stream) + waitSync(stream) Atomics.store(stream[kImpl].state, READ_INDEX, 0) Atomics.store(stream[kImpl].state, WRITE_INDEX, 0) @@ -457,12 +446,8 @@ function writeSync (stream) { } } -function flushSync (stream) { - if (stream[kImpl].flushing) { - throw new Error('unable to flush while flushing') - } - - // process._rawDebug('flushSync started') +function waitSync (stream) { + // process._rawDebug('waitSync started') const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) @@ -473,10 +458,10 @@ function flushSync (stream) { const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX) if (readIndex === -2) { - throw new Error('_flushSync failed') + throw new Error('waitSync failed') } - // process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`) + // process._rawDebug(`(waitSync) readIndex (${readIndex}) writeIndex (${writeIndex})`) if (readIndex !== writeIndex) { // TODO stream timeouts for some reason. Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000) @@ -485,10 +470,28 @@ function flushSync (stream) { } if (++spins === 10) { - throw new Error('_flushSync took too long (10s)') + throw new Error('waitSync took too long (10s)') } } - // process._rawDebug('flushSync finished') + // process._rawDebug('waitSync finished') +} + +function waitAsync (stream, cb) { + const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX) + // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`) + wait(stream[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => { + if (err) { + destroy(stream, err) + process.nextTick(cb, err) + return + } + if (res === 'not-equal') { + // TODO handle deadlock + waitAsync(stream, cb) + return + } + process.nextTick(cb) + }) } module.exports = ThreadStream diff --git a/test/string-limit-2.test.js b/test/string-limit-2.test.js new file mode 100644 index 0000000..dc2e959 --- /dev/null +++ b/test/string-limit-2.test.js @@ -0,0 +1,31 @@ +'use strict' + +const t = require('tap') +const { join } = require('path') +const { file } = require('./helper') +const { createReadStream } = require('fs') +const ThreadStream = require('..') +const buffer = require('buffer') + +const MAX_STRING = buffer.constants.MAX_STRING_LENGTH + +t.plan(1) + +const dest = file() +const stream = new ThreadStream({ + filename: join(__dirname, 'to-file.js'), + workerData: { dest }, + sync: false +}) + +stream.on('close', async () => { + let buf + for await (const chunk of createReadStream(dest)) { + buf = chunk + } + t.equal('asd', buf.toString().slice(-3)) +}) + +stream.write('a'.repeat(MAX_STRING - 2)) +stream.write('asd') +stream.end()