Skip to content

Commit

Permalink
streams: fixes for webstreams
Browse files Browse the repository at this point in the history
PR-URL: nodejs#51168
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
MattiasBuelens committed May 1, 2024
1 parent d36fbc3 commit 7e5d00c
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 281 deletions.
180 changes: 87 additions & 93 deletions lib/internal/webstreams/readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const {
FunctionPrototypeCall,
MathMin,
NumberIsInteger,
ObjectCreate,
ObjectDefineProperties,
ObjectSetPrototypeOf,
Promise,
Expand Down Expand Up @@ -96,9 +95,9 @@ const {
AsyncIterator,
cloneAsUint8Array,
copyArrayBuffer,
createPromiseCallback,
customInspect,
dequeueValue,
ensureIsPromise,
enqueueValueWithSize,
extractHighWaterMark,
extractSizeAlgorithm,
Expand Down Expand Up @@ -251,19 +250,7 @@ class ReadableStream {
constructor(source = {}, strategy = kEmptyObject) {
if (source === null)
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();

this[kIsClosedPromise] = createDeferredPromise();
this[kControllerErrorFunction] = () => {};
Expand Down Expand Up @@ -655,17 +642,7 @@ function TransferredReadableStream() {
return makeTransferable(ReflectConstruct(
function() {
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
},
[], ReadableStream));
Expand Down Expand Up @@ -1223,43 +1200,58 @@ ObjectDefineProperties(ReadableByteStreamController.prototype, {
[SymbolToStringTag]: getNonWritablePropertyDescriptor(ReadableByteStreamController.name),
});

function TeeReadableStream(start, pull, cancel) {
function InternalReadableStream(start, pull, cancel, highWaterMark, size) {
this[kType] = 'ReadableStream';
this[kState] = {
disturbed: false,
state: 'readable',
storedError: undefined,
stream: undefined,
transfer: {
writable: undefined,
port: undefined,
promise: undefined,
},
};
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
setupReadableStreamDefaultControllerFromSource(
const controller = new ReadableStreamDefaultController(kSkipThrow);
setupReadableStreamDefaultController(
this,
ObjectCreate(null, {
start: { __proto__: null, value: start },
pull: { __proto__: null, value: pull },
cancel: { __proto__: null, value: cancel },
}),
1,
() => 1);
controller,
start,
pull,
cancel,
highWaterMark,
size);
return makeTransferable(this);
}

ObjectSetPrototypeOf(InternalReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(InternalReadableStream, ReadableStream);

return makeTransferable(this);
function createReadableStream(start, pull, cancel, highWaterMark = 1, size = () => 1) {
const stream = new InternalReadableStream(start, pull, cancel, highWaterMark, size);

// For spec compliance the InternalReadableStream must be a ReadableStream
stream.constructor = ReadableStream;
return stream;
}

function InternalReadableByteStream(start, pull, cancel) {
markTransferMode(this, false, true);
this[kType] = 'ReadableStream';
this[kState] = createReadableStreamState();
this[kIsClosedPromise] = createDeferredPromise();
const controller = new ReadableByteStreamController(kSkipThrow);
setupReadableByteStreamController(
this,
controller,
start,
pull,
cancel,
0,
undefined);
}

ObjectSetPrototypeOf(TeeReadableStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(TeeReadableStream, ReadableStream);
ObjectSetPrototypeOf(InternalReadableByteStream.prototype, ReadableStream.prototype);
ObjectSetPrototypeOf(InternalReadableByteStream, ReadableStream);

function createTeeReadableStream(start, pull, cancel) {
const tee = new TeeReadableStream(start, pull, cancel);
function createReadableByteStream(start, pull, cancel) {
const stream = new InternalReadableByteStream(start, pull, cancel);

// For spec compliance the Tee must be a ReadableStream
tee.constructor = ReadableStream;
return tee;
// For spec compliance the InternalReadableByteStream must be a ReadableStream
stream.constructor = ReadableStream;
return stream;
}

const isReadableStream =
Expand All @@ -1275,6 +1267,23 @@ const isReadableStreamBYOBReader =

// ---- ReadableStream Implementation

function createReadableStreamState() {
return {
__proto__: null,
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
transfer: {
__proto__: null,
writable: undefined,
port1: undefined,
port2: undefined,
promise: undefined,
},
};
}

function readableStreamFromIterable(iterable) {
let stream;
const iteratorRecord = getIterator(iterable, 'async');
Expand Down Expand Up @@ -1314,16 +1323,12 @@ function readableStreamFromIterable(iterable) {
});
}

stream = new ReadableStream({
start: startAlgorithm,
pull: pullAlgorithm,
cancel: cancelAlgorithm,
}, {
size() {
return 1;
},
highWaterMark: 0,
});
stream = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
0,
);

return stream;
}
Expand Down Expand Up @@ -1649,9 +1654,9 @@ function readableStreamDefaultTee(stream, cloneForBranch2) {
}

branch1 =
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
createReadableStream(nonOpStart, pullAlgorithm, cancel1Algorithm);
branch2 =
createTeeReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);
createReadableStream(nonOpStart, pullAlgorithm, cancel2Algorithm);

PromisePrototypeThen(
reader[kState].close.promise,
Expand Down Expand Up @@ -1928,16 +1933,10 @@ function readableByteStreamTee(stream) {
return cancelDeferred.promise;
}

branch1 = new ReadableStream({
type: 'bytes',
pull: pull1Algorithm,
cancel: cancel1Algorithm,
});
branch2 = new ReadableStream({
type: 'bytes',
pull: pull2Algorithm,
cancel: cancel2Algorithm,
});
branch1 =
createReadableByteStream(nonOpStart, pull1Algorithm, cancel1Algorithm);
branch2 =
createReadableByteStream(nonOpStart, pull2Algorithm, cancel2Algorithm);

forwardReaderError(reader);

Expand Down Expand Up @@ -1988,10 +1987,7 @@ function readableStreamCancel(stream, reason) {
}

return PromisePrototypeThen(
ensureIsPromise(
stream[kState].controller[kCancel],
stream[kState].controller,
reason),
stream[kState].controller[kCancel](reason),
() => {});
}

Expand Down Expand Up @@ -2356,7 +2352,7 @@ function readableStreamDefaultControllerCallPullIfNeeded(controller) {
assert(!controller[kState].pullAgain);
controller[kState].pulling = true;
PromisePrototypeThen(
ensureIsPromise(controller[kState].pullAlgorithm, controller),
controller[kState].pullAlgorithm(controller),
() => {
controller[kState].pulling = false;
if (controller[kState].pullAgain) {
Expand Down Expand Up @@ -2386,12 +2382,9 @@ function readableStreamDefaultControllerError(controller, error) {

function readableStreamDefaultControllerCancelSteps(controller, reason) {
resetQueue(controller);
try {
const result = controller[kState].cancelAlgorithm(reason);
return result;
} finally {
readableStreamDefaultControllerClearAlgorithms(controller);
}
const result = controller[kState].cancelAlgorithm(reason);
readableStreamDefaultControllerClearAlgorithms(controller);
return result;
}

function readableStreamDefaultControllerPullSteps(controller, readRequest) {
Expand Down Expand Up @@ -2465,11 +2458,10 @@ function setupReadableStreamDefaultControllerFromSource(
FunctionPrototypeBind(start, source, controller) :
nonOpStart;
const pullAlgorithm = pull ?
FunctionPrototypeBind(pull, source, controller) :
createPromiseCallback('source.pull', pull, source) :
nonOpPull;

const cancelAlgorithm = cancel ?
FunctionPrototypeBind(cancel, source) :
createPromiseCallback('source.cancel', cancel, source) :
nonOpCancel;

setupReadableStreamDefaultController(
Expand Down Expand Up @@ -3097,7 +3089,7 @@ function readableByteStreamControllerCallPullIfNeeded(controller) {
assert(!controller[kState].pullAgain);
controller[kState].pulling = true;
PromisePrototypeThen(
ensureIsPromise(controller[kState].pullAlgorithm, controller),
controller[kState].pullAlgorithm(controller),
() => {
controller[kState].pulling = false;
if (controller[kState].pullAgain) {
Expand Down Expand Up @@ -3264,10 +3256,10 @@ function setupReadableByteStreamControllerFromSource(
FunctionPrototypeBind(start, source, controller) :
nonOpStart;
const pullAlgorithm = pull ?
FunctionPrototypeBind(pull, source, controller) :
createPromiseCallback('source.pull', pull, source, controller) :
nonOpPull;
const cancelAlgorithm = cancel ?
FunctionPrototypeBind(cancel, source) :
createPromiseCallback('source.cancel', cancel, source) :
nonOpCancel;

if (autoAllocateChunkSize === 0) {
Expand Down Expand Up @@ -3364,4 +3356,6 @@ module.exports = {
readableByteStreamControllerPullSteps,
setupReadableByteStreamController,
setupReadableByteStreamControllerFromSource,
createReadableStream,
createReadableByteStream,
};
Loading

0 comments on commit 7e5d00c

Please sign in to comment.