Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for iterators #732

Open
wants to merge 15 commits into
base: current
Choose a base branch
from
Open

feat: add support for iterators #732

wants to merge 15 commits into from

Conversation

metcoder95
Copy link
Member

@metcoder95 metcoder95 commented Jan 19, 2025

This is a first iteration of the support for (async)iterators on Piscina.

It is quite raw as I wanted to experiment first a bit how it can look like and also possibly draft the API of it; I'm still having some doubts about how can it look like and what differentiation can it have, so I just came with a easy one of, returning an Stream every time an iterator is identified while handling the worker's workload.

Further optimizations:

  • Replace port.postMessage with a ring buffer to reduce overhead - for further PRs
  • Properly implement WorkerStream which will be an abstraction of the ring buffer implementation - same as previous point
  • Shape API
  • Docs

@metcoder95 metcoder95 marked this pull request as draft January 19, 2025 21:44
@metcoder95 metcoder95 marked this pull request as ready for review February 5, 2025 10:27
@metcoder95
Copy link
Member Author

It should be ready for review; I'm quite curious about seeing an ergonomic enough API. I already have some baseline for optimizations, especially around messages between worker and main thread, but I'd like to start gathering feedback about the API shape and leave the further enhancements for upcoming PRs

@metcoder95
Copy link
Member Author

I'll be adding docs later this week

@jdmarshall
Copy link

Reading the code, I’m unclear how or if you are managing back pressure on the worker. It looks like you’re not waiting for the parent process to call next(), which would be fairly poor performance, something you could emulate entirely with postMessage(). What keeps the receiver from getting backed up reading messages?

@metcoder95
Copy link
Member Author

Reading the code, I’m unclear how or if you are managing back pressure on the worker. It looks like you’re not waiting for the parent process to call next(), which would be fairly poor performance, something you could emulate entirely with postMessage(). What keeps the receiver from getting backed up reading messages?

Currently, how to implement backpressure was not really clear with the current setup.
For this PoC, the Worker is the one that drives the flow of the data; until the worker calls postMessage with done = true, the main thread will be sit waiting for the upcoming chunk.

Tho, the other way around is kind of more complicated (what if the main thread wants to pause the stream or the stream's buffer is full and needs drain).

I shift my focus to #746 using ring-buffers; I'll try to cover more of this on those efforts

@jdmarshall
Copy link

So the buffer length will end up dictating the max queuing before the sender stalls. Sounds fair.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants