Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancelling and exception handling async iteration of ReadableStreams #1255

Open
hamishwillee opened this issue Feb 7, 2023 · 12 comments
Open

Comments

@hamishwillee
Copy link

I'm looking at this for the MDN documentation (tracked in mdn/content#23678)

Reading a stream looks pretty straightforward; you get a stream and read it in chunks using for await (const chunk of mystream) {}:

let bytes = 0;
logChunks(mystream);
async function logChunks(readableXX) {
  for await (const chunk of readableXX) {
    bytes+=chunk.length;
    logConsumer( `Chunk: ${chunk}. Read ${bytes} characters.`);
  }
}
  1. What is the "right way" to cancel a stream read during async iteration from a button press?

    If you're writing your own underlying source then you can add a listener that closes the stream on button click using controller.close().

    return new ReadableStream({
        start(controller) {
        button.addEventListener('click', () => {
          controller.close();
          });
        readRepeatedly().catch((e) => controller.error(e));
        ...

    But if you're using fetch() you have no control over the underlying source - you get a ReadableStream back from the response.

    • you can't call cancel on the stream because it is locked to the default reader, for which you don't have a handle (since you're working direct with the readablestream).
    • You could set a boolean in the event handler, then in the for loop you could use it to return, cancelling the operation. But this would have to await at least one more chunk, which could in theory take some time to arrive.
    • My guess is that for fetch you'd use an AbortController and abort the source of the stream. This would then propagate errors or whatever through the returned ReadableStream?

    What is the "general recommendation"? Is it that you abort the underlying source (if possible) and if not, perhaps you wrap your stream in a another custom stream?

  2. How are you supposed to handle errors from the source - e.g. a TypeError or a network error or something? I tried putting try/catch around the logChunks() above and various other places but I don't seem to be able to catch them.

  3. What is the recommended way to handle the case of a browser that does not support this feature? Is there a polyfill on the Internet that we should point users to?

  4. Tracking bugs seem to indicate this is not yet in Safari or Chrome. Do you happen to know if Deno/NodeJS support this, and if so, whether it is compatible to the streams spec?

@MattiasBuelens
Copy link
Collaborator

  1. What is the "right way" to cancel a stream read during async iteration from a button press?

The easiest way would be to break out of the loop. This will call return() on the async iterator, which will cancel a ReadableStream as per spec:

By default, calling the async iterator’s return() method will also cancel the stream. To prevent this, use the stream’s values() method, passing true for the preventCancel option.

You can use an AbortController for this: check if the signal is aborted in each iteration, and abort() when the button is clicked.

const controller = new AbortController();
button.addEventListener('click', () => controller.abort());
logChunks(mystream, { signal: controller.signal });

async function logChunks(readableXX, { signal }) {
  for await (const chunk of readableXX) {
    if (signal.aborted) throw signal.reason;
    bytes += chunk.length;
    logConsumer( `Chunk: ${chunk}. Read ${bytes} characters.`);
  }
}
  1. How are you supposed to handle errors from the source - e.g. a TypeError or a network error or something? I tried putting try/catch around the logChunks() above and various other places but I don't seem to be able to catch them.

Hmm, that's odd. An error should cause next() to reject, which in turn should make the for await..of loop throw. 😕

Could your provide a minimal reproduction case for this?

  1. What is the recommended way to handle the case of a browser that does not support this feature? Is there a polyfill on the Internet that we should point users to?

Yes, there's web-streams-polyfill and sd-streams.

  1. Tracking bugs seem to indicate this is not yet in Safari or Chrome. Do you happen to know if Deno/NodeJS support this, and if so, whether it is compatible to the streams spec?

Indeed, there are no browsers that ship an implementation yet. But both NodeJS and Deno already have full support and are fully compliant. 🙂

@hamishwillee
Copy link
Author

@MattiasBuelens Thank you!

For 1., the problem with using AbortController is that it is the same as the suggestion I made in the bullet:

  • You could set a boolean in the event handler, then in the for loop you could use it to return, cancelling the operation. But this would have to await at least one more chunk, which could in theory take some time to arrive.

So yes, you can throw in the loop on abort or some other signal, or you can just call return to silently exit. The problem is that you only get to do this after a new chunk of data has arrive - right?
So is this a "good" solution?

I was thinking it would be better to call your fetch with the abortcontroller - that would abort the fetch that is supplying your stream, propagating the abort reason from the underlying source.

So as a general recommendation I was thinking "abort or cancel the underlying source if mechanisms exist, otherwise you will have to wait for the next loop iteration and call break/return (as you indicate).

For 2, I will get back to you on Friday (on another job today).

@MattiasBuelens
Copy link
Collaborator

So yes, you can throw in the loop on abort or some other signal, or you can just call return to silently exit. The problem is that you only get to do this after a new chunk of data has arrive - right?
So is this a "good" solution?

That's a good point. As currently specified, all calls to next() and return() are "serialized": if you call return() while a previous next() promise is still pending, then that return() promise is "chained" after the pending next() promise.

So unfortunately, it's not possible to cancel a pending read when async-iterating a ReadableStream, even though that would be possible if you're using a ReadableStreamDefaultReader directly.

I was thinking it would be better to call your fetch with the abortcontroller - that would abort the fetch that is supplying your stream, propagating the abort reason from the underlying source.

So as a general recommendation I was thinking "abort or cancel the underlying source if mechanisms exist, otherwise you will have to wait for the next loop iteration and call break/return (as you indicate).

Indeed, if you can pass an AbortSignal when you construct the ReadableStream (like with fetch()), then that will be the fastest way to get it to cancel the stream.

However, that does make it harder to compose streams. If you have a pipe chain, ideally you want to consume and cancel it from the end of the chain, and then have it propagate up to the start. But now you have to also keep track of an AbortController so you can abort it from the front... 😕


I'm wondering if we should change the specification for this. Should return() be allowed to reject all pending next() promises immediately, instead of waiting for them to settle? This would align closer to how a "regular" reader works. Even with readable.values({ preventCancel: true }), it would work just as reader.releaseLock() thanks to #1168. Or does JavaScript have different expectations for how the async iterator protocol is supposed to work? 🤔

Or maybe we could add a signal option to ReadableStreamIteratorOptions? That would allow the consumer to opt into "faster" cancellation, and it would be usable from for await..of loops:

async function logChunks(readableXX, { signal }) {
  for await (const chunk of readableXX.values({ signal })) { // <<<
    bytes += chunk.length;
    logConsumer( `Chunk: ${chunk}. Read ${bytes} characters.`);
  }
}

@MattiasBuelens
Copy link
Collaborator

@domenic What are your thoughts on this? Should it be possible to cancel the stream immediately through its async iterator, and have those pending reads become rejected? (Or I guess they should actually become resolved...)

Was there a reason why we needed all next() and return() calls to be queued for WebIDL's async iterator machinery? Is this something ECMAScript expects from the async iterator protocol?

@hamishwillee
Copy link
Author

Thanks @MattiasBuelens - it is really helpful that you have confirmed the current behaviour. I'm interested to see the further result of this discussion on whether the API needs to change.

W.r.t. the other part of my post about catching errors, you are right that putting the try/catch around the for await ... does work! Putting it around the function logChunks() that does the iteration does not work - any thoughts why (hope this is not some blindingly obvious JavaScript thing.

@domenic
Copy link
Member

domenic commented Feb 15, 2023

I am pretty sure we designed Web IDL's async iterator machinery this way, to make it follow JavaScript async generators. That is, if you do

  await sleep(100);
  yield 1;
  await sleep(100);
  yield 2;
  await sleep(100);
  yield 3;
}

const gen = g();
console.log(await gen.next()); // { value: 1, done: false }

const p = gen.next(); // intentionally no `await`
gen.return();
console.log(await p); // { value: 2, done: false }, i.e. the ongoing processing to produce 2 is let to continue.

console.log(await gen.next()); // { value: undefined, done: true }

The async iterator protocol itself would allow us to do something different here, but I'm unsure if we should depart from async generator behavior...

@MattiasBuelens
Copy link
Collaborator

Putting it around the function logChunks() that does the iteration does not work - any thoughts why (hope this is not some blindingly obvious JavaScript thing.

This might be a silly question, but are you awaiting the result of logChunks()? That is:

try {
  await logChunks();
} catch (error) {
  console.error("Oh no!");
}

Because if you don't await it, then the returned promise won't be used and you'll (at best) get an unhandled promise rejection in the console.

I am pretty sure we designed Web IDL's async iterator machinery this way, to make it follow JavaScript async generators.

Right, of course, I should have compared with async generators. 😅 I suppose we can't depart from those semantics.

How do we feel about adding an AbortSignal in the mix? 🤔 We could add an abort algorithm that is pretty much the same as our asynchronous iterator return steps, except that we don't assert that [[readRequests]] is empty (which is no longer necessary as of #1168).

However, I'm a bit worried about how that would compose. Would the proposed ReadableStream.from() method need to accept an optional AbortController argument, so it can early-abort the given async iterator on cancel()?

const readable1 = new ReadableStream({ /* ... */ });
const controller = new AbortController();
const iterator = readable1.values({ signal: controller.signal });
const readable2 = ReadableStream.from(iterator, { controller });
// readable2 behaves like readable1, and cancelling readable2 immediately cancels readable1

@hamishwillee
Copy link
Author

hamishwillee commented Feb 15, 2023

@MattiasBuelens I'm an idiot - as you say, not awaiting the logChunks().

The rest of the question I presume is to @domenic .
This looks like the kind of thing AbortController was made for. However I'm too ignorant to add any value - even though you explained it is an issue, I still don't really understand why aborting the underlying source (e.g. the fetch()) and having that propagate through the chain of streams is such a problem.

@domenic
Copy link
Member

domenic commented Feb 16, 2023

I suppose we can't depart from those semantics.

I mean, we definitely can. The async iterator protocol is very bare-bones, with lots of details left up to the specific iterator. The async generator case is one specific instantiation of that protocol, and perhaps is the one that has behavior people expect / the language designers intended? So I think it's a good starting place, and we should be hesitant to depart from it. But, we could depart, if we think there's a good reason and it wouldn't surprise people too much...

Regarding AbortSignal integration: well, first, to @hamishwillee's point, I think the best case is if your stream-creator takes an AbortSignal, and uses that appropriately throughout the chain. fetch() does that. So, I think the questions are:

  • Do we want to make that easier, e.g. by adding an AbortSignal to the ReadableStream constructor? (I think this might be reasonable. But, how is it related to what we've done for WritableStreamController's signal property? Should all controllers have a signal property, and all constructors accept a signal? Not sure...)

  • If stream creators don't do that, should we provide affordances to let stream consumers abort earlier anyway? I'm quite unsure here.

@tilgovi
Copy link

tilgovi commented Apr 10, 2024

I think the best case is if your stream-creator takes an AbortSignal, and uses that appropriately throughout the chain. fetch() does that.

That's true, but fetch() only rejects during the request phase. Once we've got a response, the readable body stream gets cancelled by abort. Any pending read just resolves with { done: true, value: undefined }. It's more as though fetch() accepts a signal to use for aborting the writable end of a transform stream into which it dumps the response. The readable stream doesn't really need to know anything about abort.

I've just had a chance to refactor some code that was using async iterables and generators to use streams and I don't think it makes sense to add an abort signal to stream constructors. Calling abort() or cancel() is the way to cancel the stream. Whether that's driven by an AbortSignal is the user's decision. Whether the stream uses an AbortSignal internally for cleanup is the stream implementor's decision.

However, it might be common enough that cancelling a readable stream should abort an in-flight pull that providing a signal on the controller would be a convenience and improve symmetry with writable streams. There are other asymmetries between readable and writable stream, the nuance of which I might not fully appreciate yet, and I don't know if the signal is part of that.

Calling abort() on a writable stream first waits until any in-flight write settles, then it finishes by awaiting the abort steps defined by the sink. Meanwhile, calling cancel() on a stream just awaits the cancellation steps defined by the source. Pending read requests settle immediately with { done: true, value: undefined }.

If we did want to add a signal to the readable controller, does it then become important that the signal be a way to abort an in-flight pull, and should cancellation therefore wait for that? I think the spec would maybe want to break cancellation into two phases like it does with writable streams. In the first phase, an in-flight pull would get a chance to resolve, then the second phase would be as cancellation is now.

That seems like a more invasive change that deliberate breaks the asymmetry, and then I'm left wondering why cancel() vs abort() / close() at all?

@jedwards1211
Copy link

jedwards1211 commented Oct 29, 2024

@domenic it's basically unsafe to use async iteration on anything that acquires resources, unless you do something like wrap with a signal handler:

export function abortableAsyncIterable<T, TReturn, TNext>(
  iter: AsyncIterable<T, TReturn, TNext>,
  signal: AbortSignal
): AsyncIterable<T, TReturn, TNext> {
  const abortedPromise = new Promise<IteratorResult<T, TReturn>>(
    (resolve, reject) => {
      if (signal.aborted) {
        reject(new DOMException('aborted', 'AbortError'))
      }
      signal.addEventListener('abort', () =>
        reject(new DOMException('aborted', 'AbortError'))
      )
    }
  )
  abortedPromise.catch(() => {})
  return {
    [Symbol.asyncIterator]: () => {
      const inner = iter[Symbol.asyncIterator]()
      const { return: _return, throw: _throw } = inner
      return {
        next: (...args) => Promise.race([inner.next(...args), abortedPromise]),
        return: _return ? (...args) => _return.apply(inner, args) : undefined,
        throw: _throw ? (...args) => _throw.apply(inner, args) : undefined,
      }
    },
  }
}

And even with this, it's still up to the async iterable to handle return() immediately to prevent resource leaks.

That's a lot of responsibility on both the consumer and producer sides...

I'm pretty sure among places the webapps I work on do async iteration, the majority of them need something like this.

I think this should be a builtin function or even special syntax like

for await (const elem of asyncIterable until signal) {
  ...
}

Because it's easy to forget that we'll leak resources without doing this, and I'm sure a lot of JS developers don't even realize it.

@jedwards1211
Copy link

jedwards1211 commented Oct 29, 2024

I almost think it would be better for everyone if there were no Symbol.asyncIterator() method on ReadableStream, and instead you have to call a method that requires an AbortSignal argument to get an async iterator.

But maybe I'm in the minority for wrapping event streams that can wait indefinitely without timing out in ReadableStreams. If that's a common case though, then async iteration is a widespread pitfall

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

5 participants