Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: wait for flush to finish #55

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ function nextFlush (stream) {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)

if (stream[kImpl].buf.length === 0) {
nextFlush(stream)
return
}

// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
Expand Down Expand Up @@ -212,10 +217,9 @@ class ThreadStream extends EventEmitter {
throw new Error('the worker is ending')
}

if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for flushing here won't help us to avoid MAX_STRING length on buf.

if (this[kImpl].buf.length + data.length >= MAX_STRING) {
try {
writeSync(this)
this[kImpl].flushing = true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make sense?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did. There is a flush scheduled in nextTick where this was correctly reset.

} catch (err) {
destroy(this, err)
return false
Expand Down Expand Up @@ -257,22 +261,8 @@ class ThreadStream extends EventEmitter {
throw new Error('the worker has exited')
}

// TODO write all .buf
const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
// process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
if (err) {
destroy(this, err)
process.nextTick(cb, err)
return
}
if (res === 'not-equal') {
// TODO handle deadlock
this.flush(cb)
return
}
process.nextTick(cb)
})
writeSync(this) // TODO (fix): Make this async somehow.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#56

waitAsync(this, cb)
}

flushSync () {
Expand All @@ -281,7 +271,7 @@ class ThreadStream extends EventEmitter {
}

writeSync(this)
flushSync(this)
waitSync(this)
}

unref () {
Expand Down Expand Up @@ -416,13 +406,12 @@ function writeSync (stream) {
process.nextTick(drain, stream)
}
}
stream[kImpl].flushing = false

while (stream[kImpl].buf.length !== 0) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
if (leftover === 0) {
flushSync(stream)
waitSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
continue
Expand All @@ -439,7 +428,7 @@ function writeSync (stream) {
write(stream, toWrite, cb)
} else {
// multi-byte utf-8
flushSync(stream)
waitSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)

Expand All @@ -457,12 +446,8 @@ function writeSync (stream) {
}
}

function flushSync (stream) {
if (stream[kImpl].flushing) {
throw new Error('unable to flush while flushing')
}

// process._rawDebug('flushSync started')
function waitSync (stream) {
// process._rawDebug('waitSync started')

const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)

Expand All @@ -473,10 +458,10 @@ function flushSync (stream) {
const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)

if (readIndex === -2) {
throw new Error('_flushSync failed')
throw new Error('waitSync failed')
}

// process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
// process._rawDebug(`(waitSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
if (readIndex !== writeIndex) {
// TODO stream timeouts for some reason.
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
Expand All @@ -485,10 +470,28 @@ function flushSync (stream) {
}

if (++spins === 10) {
throw new Error('_flushSync took too long (10s)')
throw new Error('waitSync took too long (10s)')
}
}
// process._rawDebug('flushSync finished')
// process._rawDebug('waitSync finished')
}

function waitAsync (stream, cb) {
mcollina marked this conversation as resolved.
Show resolved Hide resolved
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
// process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
wait(stream[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
if (err) {
destroy(stream, err)
process.nextTick(cb, err)
return
}
if (res === 'not-equal') {
// TODO handle deadlock
waitAsync(stream, cb)
return
}
process.nextTick(cb)
})
}

module.exports = ThreadStream
31 changes: 31 additions & 0 deletions test/string-limit-2.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict'

const t = require('tap')
const { join } = require('path')
const { file } = require('./helper')
const { createReadStream } = require('fs')
const ThreadStream = require('..')
const buffer = require('buffer')

const MAX_STRING = buffer.constants.MAX_STRING_LENGTH

t.plan(1)

const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})

stream.on('close', async () => {
let buf
for await (const chunk of createReadStream(dest)) {
buf = chunk
}
t.equal('asd', buf.toString().slice(-3))
})

stream.write('a'.repeat(MAX_STRING - 2))
stream.write('asd')
stream.end()