From 273e42508ecfe09ae763d1921d853c2fec88a577 Mon Sep 17 00:00:00 2001 From: Youenn Fablet Date: Thu, 13 Apr 2023 10:53:48 +0200 Subject: [PATCH] update implementation --- .../lib/ReadableStream-impl.js | 2 +- .../ReadableStreamDefaultController-impl.js | 6 ++-- .../ReadableStreamDefaultController.webidl | 7 +++- .../lib/UnderlyingSource.webidl | 2 +- .../lib/abstract-ops/miscellaneous.js | 8 +++++ .../lib/abstract-ops/queue-with-sizes.js | 20 ++++++++++-- .../lib/abstract-ops/readable-streams.js | 32 ++++++++++++------- .../lib/abstract-ops/transform-streams.js | 2 +- .../lib/abstract-ops/writable-streams.js | 4 +-- 9 files changed, 61 insertions(+), 22 deletions(-) diff --git a/reference-implementation/lib/ReadableStream-impl.js b/reference-implementation/lib/ReadableStream-impl.js index 1eda283e1..4373fd9d0 100644 --- a/reference-implementation/lib/ReadableStream-impl.js +++ b/reference-implementation/lib/ReadableStream-impl.js @@ -29,7 +29,7 @@ exports.implementation = class ReadableStreamImpl { this, underlyingSource, underlyingSourceDict, highWaterMark ); } else { - assert(!('type' in underlyingSourceDict)); + assert(!('type' in underlyingSourceDict) || underlyingSourceDict.type === 'transfer'); const sizeAlgorithm = ExtractSizeAlgorithm(strategy); const highWaterMark = ExtractHighWaterMark(strategy, 1); aos.SetUpReadableStreamDefaultControllerFromUnderlyingSource( diff --git a/reference-implementation/lib/ReadableStreamDefaultController-impl.js b/reference-implementation/lib/ReadableStreamDefaultController-impl.js index 5c7ec7033..1ad935f9b 100644 --- a/reference-implementation/lib/ReadableStreamDefaultController-impl.js +++ b/reference-implementation/lib/ReadableStreamDefaultController-impl.js @@ -17,12 +17,14 @@ exports.implementation = class ReadableStreamDefaultControllerImpl { aos.ReadableStreamDefaultControllerClose(this); } - enqueue(chunk) { + enqueue(chunk, optionsOrTransfer) { + const hasTransfer = optionsOrTransfer && !Array.isArray(optionsOrTransfer); + const transfer = hasTransfer ? optionsOrTransfer.transfer : optionsOrTransfer; if (aos.ReadableStreamDefaultControllerCanCloseOrEnqueue(this) === false) { throw new TypeError('The stream is not in a state that permits enqueue'); } - return aos.ReadableStreamDefaultControllerEnqueue(this, chunk); + return aos.ReadableStreamDefaultControllerEnqueue(this, chunk, transfer); } error(e) { diff --git a/reference-implementation/lib/ReadableStreamDefaultController.webidl b/reference-implementation/lib/ReadableStreamDefaultController.webidl index aeea7249f..7dadebf00 100644 --- a/reference-implementation/lib/ReadableStreamDefaultController.webidl +++ b/reference-implementation/lib/ReadableStreamDefaultController.webidl @@ -1,8 +1,13 @@ +dictionary StructuredSerializeOptions { + sequence transfer = []; +}; + [Exposed=(Window,Worker,Worklet)] interface ReadableStreamDefaultController { readonly attribute unrestricted double? desiredSize; undefined close(); - undefined enqueue(optional any chunk); + undefined enqueue(any chunk, sequence transfer); + undefined enqueue(optional any chunk, optional StructuredSerializeOptions options = { }); undefined error(optional any e); }; diff --git a/reference-implementation/lib/UnderlyingSource.webidl b/reference-implementation/lib/UnderlyingSource.webidl index 7a0047638..50a61f5a8 100644 --- a/reference-implementation/lib/UnderlyingSource.webidl +++ b/reference-implementation/lib/UnderlyingSource.webidl @@ -12,4 +12,4 @@ callback UnderlyingSourceStartCallback = any (ReadableStreamController controlle callback UnderlyingSourcePullCallback = Promise (ReadableStreamController controller); callback UnderlyingSourceCancelCallback = Promise (optional any reason); -enum ReadableStreamType { "bytes" }; +enum ReadableStreamType { "bytes", "transfer" }; diff --git a/reference-implementation/lib/abstract-ops/miscellaneous.js b/reference-implementation/lib/abstract-ops/miscellaneous.js index 08589a740..406e43360 100644 --- a/reference-implementation/lib/abstract-ops/miscellaneous.js +++ b/reference-implementation/lib/abstract-ops/miscellaneous.js @@ -20,3 +20,11 @@ exports.CloneAsUint8Array = O => { const buffer = O.buffer.slice(O.byteOffset, O.byteOffset + O.byteLength); return new Uint8Array(buffer); }; + +exports.StructuredTransferOrClone = (value, transfer) => { + // FIXME: We should check whether value is an array buffer or is transferable and update transfer accordingly. + if (self.structuredClone) + return structuredClone(value, transfer); + + return JSON.parse(JSON.stringify(value)); +}; diff --git a/reference-implementation/lib/abstract-ops/queue-with-sizes.js b/reference-implementation/lib/abstract-ops/queue-with-sizes.js index 22086caa5..9e8d74d30 100644 --- a/reference-implementation/lib/abstract-ops/queue-with-sizes.js +++ b/reference-implementation/lib/abstract-ops/queue-with-sizes.js @@ -1,6 +1,6 @@ 'use strict'; const assert = require('assert'); -const { IsNonNegativeNumber } = require('./miscellaneous.js'); +const { IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js'); exports.DequeueValue = container => { assert('_queue' in container && '_queueTotalSize' in container); @@ -15,7 +15,7 @@ exports.DequeueValue = container => { return pair.value; }; -exports.EnqueueValueWithSize = (container, value, size) => { +exports.EnqueueValueWithSize = (container, value, size, transfer) => { assert('_queue' in container && '_queueTotalSize' in container); if (!IsNonNegativeNumber(size)) { @@ -24,7 +24,9 @@ exports.EnqueueValueWithSize = (container, value, size) => { if (size === Infinity) { throw new RangeError('Size must be a finite, non-NaN, non-negative number.'); } - + if (container.isTransferring) { + value = StructuredTransferOrClone(value, transfer); + } container._queue.push({ value, size }); container._queueTotalSize += size; }; @@ -40,6 +42,18 @@ exports.PeekQueueValue = container => { exports.ResetQueue = container => { assert('_queue' in container && '_queueTotalSize' in container); + if (container.isTransferring) { + while (container._queue.length > 0) { + const value = exports.DequeueValue(container); + if (typeof value.close === 'function') { + try { + value.close(); + } catch (closeException) { + // Nothing to do. + } + } + } + } container._queue = []; container._queueTotalSize = 0; }; diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index db1da4c73..350c69a18 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -6,7 +6,7 @@ const { promiseResolvedWith, promiseRejectedWith, newPromise, resolvePromise, re require('../helpers/webidl.js'); const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetachedBuffer, TransferArrayBuffer } = require('./ecmascript.js'); -const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js'); +const { CloneAsUint8Array, IsNonNegativeNumber, StructuredTransferOrClone } = require('./miscellaneous.js'); const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, @@ -89,7 +89,7 @@ function CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, hi const controller = ReadableStreamDefaultController.new(globalThis); SetUpReadableStreamDefaultController( - stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm + stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, false ); return stream; @@ -340,7 +340,7 @@ function ReadableStreamTee(stream, cloneForBranch2) { if (ReadableByteStreamController.isImpl(stream._controller)) { return ReadableByteStreamTee(stream); } - return ReadableStreamDefaultTee(stream, cloneForBranch2); + return ReadableStreamDefaultTee(stream, stream._controller.isTransferring ? true : cloneForBranch2); } function ReadableStreamDefaultTee(stream, cloneForBranch2) { @@ -392,10 +392,10 @@ function ReadableStreamDefaultTee(stream, cloneForBranch2) { // } if (canceled1 === false) { - ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1); + ReadableStreamDefaultControllerEnqueue(branch1._controller, chunk1, undefined); } if (canceled2 === false) { - ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2); + ReadableStreamDefaultControllerEnqueue(branch2._controller, chunk2, undefined); } reading = false; @@ -1074,7 +1074,7 @@ function ReadableStreamDefaultControllerClose(controller) { } } -function ReadableStreamDefaultControllerEnqueue(controller, chunk) { +function ReadableStreamDefaultControllerEnqueue(controller, chunk, transfer) { if (ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) === false) { return; } @@ -1082,6 +1082,14 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { const stream = controller._stream; if (IsReadableStreamLocked(stream) === true && ReadableStreamGetNumReadRequests(stream) > 0) { + if (controller.isTransferring) { + try { + chunk = StructuredTransferOrClone(chunk, transfer); + } catch (chunkCloneError) { + ReadableStreamDefaultControllerError(controller, chunkCloneError); + throw chunkCloneError; + } + } ReadableStreamFulfillReadRequest(stream, chunk, false); } else { let chunkSize; @@ -1093,7 +1101,7 @@ function ReadableStreamDefaultControllerEnqueue(controller, chunk) { } try { - EnqueueValueWithSize(controller, chunk, chunkSize); + EnqueueValueWithSize(controller, chunk, chunkSize, transfer); } catch (enqueueE) { ReadableStreamDefaultControllerError(controller, enqueueE); throw enqueueE; @@ -1148,7 +1156,7 @@ function ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) { } function SetUpReadableStreamDefaultController( - stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) { + stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, isTransferring) { assert(stream._controller === undefined); controller._stream = stream; @@ -1169,6 +1177,8 @@ function SetUpReadableStreamDefaultController( controller._pullAlgorithm = pullAlgorithm; controller._cancelAlgorithm = cancelAlgorithm; + controller.isTransferring = isTransferring; + stream._controller = controller; const startResult = startAlgorithm(); @@ -1195,7 +1205,7 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource( let startAlgorithm = () => undefined; let pullAlgorithm = () => promiseResolvedWith(undefined); let cancelAlgorithm = () => promiseResolvedWith(undefined); - + const isTransferring = underlyingSourceDict.type === 'transfer'; if ('start' in underlyingSourceDict) { startAlgorithm = () => underlyingSourceDict.start.call(underlyingSource, controller); } @@ -1207,8 +1217,8 @@ function SetUpReadableStreamDefaultControllerFromUnderlyingSource( } SetUpReadableStreamDefaultController( - stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm - ); + stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm, + isTransferring); } // Byte stream controllers diff --git a/reference-implementation/lib/abstract-ops/transform-streams.js b/reference-implementation/lib/abstract-ops/transform-streams.js index 8e3f5fcc3..1152d19de 100644 --- a/reference-implementation/lib/abstract-ops/transform-streams.js +++ b/reference-implementation/lib/abstract-ops/transform-streams.js @@ -155,7 +155,7 @@ function TransformStreamDefaultControllerEnqueue(controller, chunk) { // accept TransformStreamDefaultControllerEnqueue() calls. try { - ReadableStreamDefaultControllerEnqueue(readableController, chunk); + ReadableStreamDefaultControllerEnqueue(readableController, chunk, undefined); } catch (e) { // This happens when readableStrategy.size() throws. TransformStreamErrorWritableAndUnblockWrite(stream, e); diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index cf303bfe7..88eef0888 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -637,7 +637,7 @@ function WritableStreamDefaultControllerClearAlgorithms(controller) { } function WritableStreamDefaultControllerClose(controller) { - EnqueueValueWithSize(controller, closeSentinel, 0); + EnqueueValueWithSize(controller, closeSentinel, 0, undefined); WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller); } @@ -729,7 +729,7 @@ function WritableStreamDefaultControllerProcessWrite(controller, chunk) { function WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) { try { - EnqueueValueWithSize(controller, chunk, chunkSize); + EnqueueValueWithSize(controller, chunk, chunkSize, undefined); } catch (enqueueE) { WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueE); return;