-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: wait for flush to finish #55
Conversation
@@ -212,10 +212,9 @@ class ThreadStream extends EventEmitter { | |||
throw new Error('the worker is ending') | |||
} | |||
|
|||
if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking for flushing here won't help us to avoid MAX_STRING length on buf.
try { | ||
writeSync(this) | ||
this[kImpl].flushing = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It did. There is a flush scheduled in nextTick where this was correctly reset.
@mcollina PTAL |
Pull Request Test Coverage Report for Build 1564728806
💛 - Coveralls |
} | ||
process.nextTick(cb) | ||
}) | ||
writeSync(this) // TODO (fix): Make this async somehow. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mcollina I added a test that fails on main |
I would need to look into this with a couple of free hours. This logic is quite tricky. I'll get to it next week. |
I have tried running the attached test to this PR and it passes on main. |
I was able to repro with: 'use strict'
const t = require('tap')
const { join } = require('path')
const { file } = require('./helper')
const { createReadStream } = require('fs')
const ThreadStream = require('..')
const buffer = require('buffer')
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
t.plan(1)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('close', async () => {
t.comment('close emitted')
let buf
for await (const chunk of createReadStream(dest)) {
buf = chunk
}
t.equal('asd', buf.toString().slice(-3))
})
stream.on('open', () => {
t.comment('open emitted')
stream.write('a'.repeat(MAX_STRING - 2))
stream.write('asd')
stream.end()
}) |
My bad, I made a mistake in the previous example. We emit I cannot reproduce the problem using the attached test. |
You are right I had not updated my local main. |
Refs: pinojs/pino#1261