Skip to content

Commit

Permalink
PipeTo initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
youennf committed Apr 17, 2023
1 parent e5c3a24 commit 2dfabed
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
11 changes: 11 additions & 0 deletions reference-implementation/lib/abstract-ops/miscellaneous.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,14 @@ exports.CloneAsUint8Array = O => {
exports.StructuredTransferOrClone = (value, transferList) => {
return globalThis.structuredClone(value, { transfer: transferList });
};

exports.RunCloseSteps = (value) => {
if (typeof value.close === 'function') {
return;
}
try {
value.close();
} catch (closeException) {
// Nothing to do.
}
}
4 changes: 2 additions & 2 deletions reference-implementation/lib/abstract-ops/queue-with-sizes.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';
const assert = require('assert');
const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js');

exports.DequeueValue = container => {
assert('_queue' in container && '_queueTotalSize' in container);
Expand All @@ -24,7 +24,7 @@ exports.EnqueueValueWithSize = (container, value, size, transferList) => {
if (size === Infinity) {
throw new RangeError('Size must be a finite, non-NaN, non-negative number.');
}
if (container._isOwning) {
if (container._isOwning && !container._isPipeToOptimizedTransfer) {
value = StructuredTransferOrClone(value, transferList);
}
container._queue.push({ value, size });
Expand Down
10 changes: 8 additions & 2 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re
require('../helpers/webidl.js');
const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } =
require('./ecmascript.js');
const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js');
const { CloneAsUint8Array, IsNonNegativeNumber, RunCloseSteps, StructuredTransferOrClone } = require('./miscellaneous.js');
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js');
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort,
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease,
Expand Down Expand Up @@ -136,6 +136,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC

const reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);
writer._stream._controller._isPipeToOptimizedTransfer = source._controller._isOwning && dest._controller._isOwning;

source._disturbed = true;

Expand Down Expand Up @@ -206,7 +207,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
{
chunkSteps: chunk => {
currentWrite = transformPromiseWith(
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {
if (reader._stream._controller._isOwning) {
RunCloseSteps(chunk);
}
}
);
resolveRead(false);
},
Expand Down Expand Up @@ -319,6 +324,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

function finalize(isError, error) {
writer._stream._controller._isPipeToOptimizedTransfer = undefined;
WritableStreamDefaultWriterRelease(writer);
ReadableStreamDefaultReaderRelease(reader);

Expand Down

0 comments on commit 2dfabed

Please sign in to comment.