diff --git a/README.md b/README.md index fb3e797..e03d04f 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,6 @@ It aims to pass all tests, although it allows some exceptions for practical reas * The tests [with patched globals][wpt-rs-patched-global] and [with `Object.prototype.then`][wpt-then-interception]. These tests are meant for browsers to ensure user-land modifications cannot affect the internal logic of `pipeTo()` and `tee()`. However, it's not reasonable or desirable for a user-land polyfill to try and isolate itself completely from using the global `Object`. - * Certain `pipeTo()` tests that require synchronous inspection of the stream's state ([1][wpt-pipe-sync-state-1], [2][wpt-pipe-sync-state-2]). - Because the polyfill uses the public `getReader()` and `getWriter()` API to implement `pipeTo()`, it can only *asynchronously* observe if and when a stream becomes closed or errored. - Therefore, when the readable and the writable end become errored *at the exact same time*, it's difficult for the polyfill to observe these state changes in exactly the same order. * The ES5 variant passes the same tests as the ES2015 variant, except for various tests about specific characteristics of the constructors, properties and methods. These test failures do not affect the run-time behavior of the polyfill. For example: @@ -130,6 +127,4 @@ Thanks to these people for their work on [the original polyfill][creatorrr-polyf [stub-async-iterator-prototype]: https://github.com/MattiasBuelens/web-streams-polyfill/blob/v4.0.0-beta.3/src/lib/readable-stream/async-iterator.ts#L126-L134 [wpt-rs-patched-global]: https://github.com/web-platform-tests/wpt/blob/887350c2f46def5b01c4dd1f8d2eee35dfb9c5bb/streams/readable-streams/patched-global.any.js [wpt-then-interception]: https://github.com/web-platform-tests/wpt/blob/cf33f00596af295ee0f207c88e23b5f8b0791307/streams/piping/then-interception.any.js -[wpt-pipe-sync-state-1]: https://github.com/web-platform-tests/wpt/blob/e1e713c842e54ea0a9410ddc988b63d0e1d31973/streams/piping/multiple-propagation.any.js#L30-L53 -[wpt-pipe-sync-state-2]: https://github.com/web-platform-tests/wpt/blob/e1e713c842e54ea0a9410ddc988b63d0e1d31973/streams/piping/multiple-propagation.any.js#L114-L138 [creatorrr-polyfill]: https://github.com/creatorrr/web-streams-polyfill diff --git a/src/lib/readable-stream.ts b/src/lib/readable-stream.ts index 110dfbe..c790cad 100644 --- a/src/lib/readable-stream.ts +++ b/src/lib/readable-stream.ts @@ -69,7 +69,7 @@ export type ReadableByteStream = ReadableStream & { _readableStreamController: ReadableByteStreamController }; -export type ReadableStreamState = 'readable' | 'closed' | 'errored'; +type ReadableStreamState = 'readable' | 'closed' | 'errored'; /** * A readable stream represents a source of data, from which you can read. diff --git a/src/lib/readable-stream/pipe.ts b/src/lib/readable-stream/pipe.ts index f8b4408..8c909bc 100644 --- a/src/lib/readable-stream/pipe.ts +++ b/src/lib/readable-stream/pipe.ts @@ -1,18 +1,29 @@ -import type { ReadableStream, ReadableStreamState } from '../readable-stream'; -import { IsReadableStream } from '../readable-stream'; -import type { WritableStream, WritableStreamState } from '../writable-stream'; -import { IsWritableStream } from '../writable-stream'; +import type { ReadableStream } from '../readable-stream'; +import { IsReadableStream, IsReadableStreamLocked, ReadableStreamCancel } from '../readable-stream'; +import { AcquireReadableStreamDefaultReader, ReadableStreamDefaultReaderRead } from './default-reader'; +import { ReadableStreamReaderGenericRelease } from './generic-reader'; +import type { WritableStream } from '../writable-stream'; +import { + AcquireWritableStreamDefaultWriter, + IsWritableStream, + IsWritableStreamLocked, + WritableStreamAbort, + WritableStreamCloseQueuedOrInFlight, + WritableStreamDefaultWriterCloseWithErrorPropagation, + WritableStreamDefaultWriterRelease, + WritableStreamDefaultWriterWrite +} from '../writable-stream'; import assert from '../../stub/assert'; import { newPromise, PerformPromiseThen, - promiseRejectedWith, promiseResolvedWith, setPromiseIsHandledToTrue, - transformPromiseWith, uponFulfillment, - uponPromise + uponPromise, + uponRejection } from '../helpers/webidl'; +import { noop } from '../../utils'; import type { AbortSignal } from '../abort-signal'; import { isAbortSignal } from '../abort-signal'; import { DOMException } from '../../stub/dom-exception'; @@ -29,31 +40,18 @@ export function ReadableStreamPipeTo(source: ReadableStream, assert(typeof preventAbort === 'boolean'); assert(typeof preventCancel === 'boolean'); assert(signal === undefined || isAbortSignal(signal)); - assert(!source.locked); - assert(!dest.locked); + assert(!IsReadableStreamLocked(source)); + assert(!IsWritableStreamLocked(dest)); - const reader = source.getReader(); - const writer = dest.getWriter(); + const reader = AcquireReadableStreamDefaultReader(source); + const writer = AcquireWritableStreamDefaultWriter(dest); source._disturbed = true; let shuttingDown = false; - let released = false; - let sourceState: ReadableStreamState = 'readable'; - let destState: WritableStreamState = 'writable'; - let destStoredError: any; - let destCloseRequested = false; - - // This is used to track when we initially start the pipe loop, and have initialized sourceState and destState. - let started = false; - let resolveStart: () => void; - const startPromise: Promise = newPromise(resolve => { - resolveStart = resolve; - }); - // This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown. - let currentRead = promiseResolvedWith(undefined); - let currentWrite = promiseResolvedWith(undefined); + // This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown. + let currentWrite = promiseResolvedWith(undefined); return newPromise((resolve, reject) => { let abortAlgorithm: () => void; @@ -63,16 +61,16 @@ export function ReadableStreamPipeTo(source: ReadableStream, const actions: Array<() => Promise> = []; if (!preventAbort) { actions.push(() => { - if (destState === 'writable') { - return writer.abort(error); + if (dest._state === 'writable') { + return WritableStreamAbort(dest, error); } return promiseResolvedWith(undefined); }); } if (!preventCancel) { actions.push(() => { - if (sourceState === 'readable') { - return reader.cancel(error); + if (source._state === 'readable') { + return ReadableStreamCancel(source, error); } return promiseResolvedWith(undefined); }); @@ -82,20 +80,17 @@ export function ReadableStreamPipeTo(source: ReadableStream, if (signal.aborted) { abortAlgorithm(); - } else { - signal.addEventListener('abort', abortAlgorithm); + return; } + + signal.addEventListener('abort', abortAlgorithm); } // Using reader and writer, read all chunks from this and write them to dest // - Backpressure must be enforced // - Shutdown must stop all activity function pipeLoop() { - if (shuttingDown) { - return; - } - - const loop = newPromise((resolveLoop, rejectLoop) => { + return newPromise((resolveLoop, rejectLoop) => { function next(done: boolean) { if (done) { resolveLoop(); @@ -108,8 +103,6 @@ export function ReadableStreamPipeTo(source: ReadableStream, next(false); }); - - setPromiseIsHandledToTrue(loop); } function pipeStep(): Promise { @@ -117,129 +110,91 @@ export function ReadableStreamPipeTo(source: ReadableStream, return promiseResolvedWith(true); } - return PerformPromiseThen(writer.ready, () => { - const read = PerformPromiseThen(reader.read(), result => { - if (result.done) { - return true; - } - currentWrite = writer.write(result.value); - setPromiseIsHandledToTrue(currentWrite); - return false; + return PerformPromiseThen(writer._readyPromise, () => { + return newPromise((resolveRead, rejectRead) => { + ReadableStreamDefaultReaderRead( + reader, + { + _chunkSteps: chunk => { + currentWrite = PerformPromiseThen(WritableStreamDefaultWriterWrite(writer, chunk), undefined, noop); + resolveRead(false); + }, + _closeSteps: () => resolveRead(true), + _errorSteps: rejectRead + } + ); }); - currentRead = read; - return read; }); } - uponPromise(reader.closed, () => { - // Closing must be propagated forward - assert(!released); - assert(source._state === 'closed'); - sourceState = 'closed'; - if (!preventClose) { - shutdownWithAction(() => { - if (destCloseRequested || destState === 'closed') { - return promiseResolvedWith(undefined); - } - if (destState === 'errored') { - return promiseRejectedWith(destStoredError); - } - assert(destState === 'writable' || destState === 'erroring'); - destCloseRequested = true; - return writer.close(); - }); - } else { - shutdown(); - } - return null; - }, storedError => { - if (released) { - return null; - } - // Errors must be propagated forward - assert(source._state === 'errored'); - sourceState = 'errored'; + // Errors must be propagated forward + isOrBecomesErrored(source, reader._closedPromise, storedError => { if (!preventAbort) { - shutdownWithAction(() => writer.abort(storedError), true, storedError); + shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError); } else { shutdown(true, storedError); } return null; }); - uponPromise(writer.closed, () => { - assert(!released); - assert(dest._state === 'closed'); - destState = 'closed'; - return null; - }, storedError => { - if (released) { - return null; - } - // Errors must be propagated backward - assert(dest._state === 'errored'); - destState = 'errored'; - destStoredError = storedError; + // Errors must be propagated backward + isOrBecomesErrored(dest, writer._closedPromise, storedError => { if (!preventCancel) { - shutdownWithAction(() => reader.cancel(storedError), true, storedError); + shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); } else { shutdown(true, storedError); } return null; }); - // The reference implementation uses `queueMicrotask()` here, but that is not sufficient to detect - // closes or errors through `reader.closed` or `writer.closed`. - setTimeout(() => { - started = true; - resolveStart(); + // Closing must be propagated forward + isOrBecomesClosed(source, reader._closedPromise, () => { + if (!preventClose) { + shutdownWithAction(() => WritableStreamDefaultWriterCloseWithErrorPropagation(writer)); + } else { + shutdown(); + } + return null; + }); - // Closing must be propagated backward - if (destCloseRequested || destState === 'closed') { - const destClosed = new TypeError('the destination writable stream closed before all data could be piped to it'); + // Closing must be propagated backward + if (WritableStreamCloseQueuedOrInFlight(dest) || dest._state === 'closed') { + const destClosed = new TypeError('the destination writable stream closed before all data could be piped to it'); - if (!preventCancel) { - shutdownWithAction(() => reader.cancel(destClosed), true, destClosed); - } else { - shutdown(true, destClosed); - } + if (!preventCancel) { + shutdownWithAction(() => ReadableStreamCancel(source, destClosed), true, destClosed); + } else { + shutdown(true, destClosed); } + } - pipeLoop(); - }, 0); + setPromiseIsHandledToTrue(pipeLoop()); function waitForWritesToFinish(): Promise { - let oldCurrentWrite: Promise | undefined; - return promiseResolvedWith(check()); + // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait + // for that too. + const oldCurrentWrite = currentWrite; + return PerformPromiseThen( + currentWrite, + () => oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined + ); + } - function check(): undefined | Promise { - // Another write may have started while we were waiting on this currentWrite, - // so we have to be sure to wait for that too. - if (oldCurrentWrite !== currentWrite) { - oldCurrentWrite = currentWrite; - return transformPromiseWith(currentWrite, check, check); - } - return undefined; + function isOrBecomesErrored(stream: ReadableStream | WritableStream, + promise: Promise, + action: (reason: any) => null) { + if (stream._state === 'errored') { + action(stream._storedError); + } else { + uponRejection(promise, action); } } - function waitForReadsAndWritesToFinish(): Promise { - let oldCurrentRead: Promise | undefined; - let oldCurrentWrite: Promise | undefined; - return promiseResolvedWith(check()); - - function check(): undefined | Promise { - // Another read or write may have started while we were waiting on this currentRead or currentWrite, - // so we have to be sure to wait for that too. - if (oldCurrentRead !== currentRead) { - oldCurrentRead = currentRead; - return transformPromiseWith(currentRead, check, check); - } - if (oldCurrentWrite !== currentWrite) { - oldCurrentWrite = currentWrite; - return transformPromiseWith(currentWrite, check, check); - } - return undefined; + function isOrBecomesClosed(stream: ReadableStream | WritableStream, promise: Promise, action: () => null) { + if (stream._state === 'closed') { + action(); + } else { + uponFulfillment(promise, action); } } @@ -249,22 +204,17 @@ export function ReadableStreamPipeTo(source: ReadableStream, } shuttingDown = true; - if (!started) { - uponFulfillment(startPromise, onStart); - } else { - onStart(); - } - - function onStart(): null { + if (dest._state === 'writable' && !WritableStreamCloseQueuedOrInFlight(dest)) { uponFulfillment(waitForWritesToFinish(), doTheRest); - return null; + } else { + doTheRest(); } function doTheRest(): null { uponPromise( action(), - () => waitForReadsAndWritesThenFinalize(originalIsError, originalError), - newError => waitForReadsAndWritesThenFinalize(true, newError) + () => finalize(originalIsError, originalError), + newError => finalize(true, newError) ); return null; } @@ -276,27 +226,16 @@ export function ReadableStreamPipeTo(source: ReadableStream, } shuttingDown = true; - if (!started) { - uponFulfillment(startPromise, onStart); + if (dest._state === 'writable' && !WritableStreamCloseQueuedOrInFlight(dest)) { + uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); } else { - onStart(); - } - - function onStart(): null { - waitForReadsAndWritesThenFinalize(isError, error); - return null; + finalize(isError, error); } } - function waitForReadsAndWritesThenFinalize(isError?: boolean, error?: any): null { - uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error)); - return null; - } - function finalize(isError?: boolean, error?: any): null { - released = true; - writer.releaseLock(); - reader.releaseLock(); + WritableStreamDefaultWriterRelease(writer); + ReadableStreamReaderGenericRelease(reader); if (signal !== undefined) { signal.removeEventListener('abort', abortAlgorithm); diff --git a/src/lib/writable-stream.ts b/src/lib/writable-stream.ts index 0d56fbc..0a4f79e 100644 --- a/src/lib/writable-stream.ts +++ b/src/lib/writable-stream.ts @@ -30,7 +30,7 @@ import type { AbortController, AbortSignal } from './abort-signal'; import { createAbortController } from './abort-signal'; import { TransformStreamDefaultController } from './transform-stream'; -export type WritableStreamState = 'writable' | 'closed' | 'erroring' | 'errored'; +type WritableStreamState = 'writable' | 'closed' | 'erroring' | 'errored'; interface WriteOrCloseRequest { _resolve: (value?: undefined) => void; diff --git a/test/unit/piping/error-propagation-backward.spec.js b/test/unit/piping/error-propagation-backward.spec.js deleted file mode 100644 index a4abcb3..0000000 --- a/test/unit/piping/error-propagation-backward.spec.js +++ /dev/null @@ -1,29 +0,0 @@ -const { recordingReadableStream, recordingWritableStream } = require('../util/recording-streams'); - -describe('Piping: backwards error propagation', () => { - let error1; - beforeEach(() => { - error1 = new Error('error1!'); - error1.name = 'error1'; - }); - - // This test replaces the skipped WPT test with the same name. - // Original: https://github.com/web-platform-tests/wpt/blob/e1e713c842e54ea0a9410ddc988b63d0e1d31973/streams/piping/error-propagation-backward.any.js#L403-L419 - it('Errors must be propagated backward: becomes errored after piping; preventCancel = true', async () => { - const rs = recordingReadableStream(); - const ws = recordingWritableStream(); - const pipePromise = rs.pipeTo(ws, { preventCancel: true }); - pipePromise.catch(() => undefined); - - await new Promise(r => setTimeout(r, 10)); - ws.controller.error(error1); - - // Non-standard: enqueue a chunk, so reader.read() resolves and pipeTo() can release its lock. - await new Promise(r => setTimeout(r, 10)); - rs.controller.enqueue('a'); - - await expectAsync(pipePromise).withContext('pipeTo must reject with the same error').toBeRejectedWith(error1); - expect(rs.eventsWithoutPulls).toEqual([]); - expect(ws.events).toEqual([]); - }); -}); diff --git a/test/unit/util/recording-streams.js b/test/unit/util/recording-streams.js deleted file mode 100644 index 736ada5..0000000 --- a/test/unit/util/recording-streams.js +++ /dev/null @@ -1,132 +0,0 @@ -const { ReadableStream, WritableStream, TransformStream } = require('web-streams-polyfill'); - -// Original: https://github.com/web-platform-tests/wpt/blob/87a4c80598aee5178c385628174f1832f5a28ad6/streams/resources/recording-streams.js -exports.recordingReadableStream = (extras = {}, strategy) => { - let controllerToCopyOver; - const stream = new ReadableStream({ - type: extras.type, - start(controller) { - controllerToCopyOver = controller; - - if (extras.start) { - return extras.start(controller); - } - - return undefined; - }, - pull(controller) { - stream.events.push('pull'); - - if (extras.pull) { - return extras.pull(controller); - } - - return undefined; - }, - cancel(reason) { - stream.events.push('cancel', reason); - stream.eventsWithoutPulls.push('cancel', reason); - - if (extras.cancel) { - return extras.cancel(reason); - } - - return undefined; - } - }, strategy); - - stream.controller = controllerToCopyOver; - stream.events = []; - stream.eventsWithoutPulls = []; - - return stream; -}; - -exports.recordingWritableStream = (extras = {}, strategy) => { - let controllerToCopyOver; - const stream = new WritableStream({ - start(controller) { - controllerToCopyOver = controller; - - if (extras.start) { - return extras.start(controller); - } - - return undefined; - }, - write(chunk, controller) { - stream.events.push('write', chunk); - - if (extras.write) { - return extras.write(chunk, controller); - } - - return undefined; - }, - close() { - stream.events.push('close'); - - if (extras.close) { - return extras.close(); - } - - return undefined; - }, - abort(e) { - stream.events.push('abort', e); - - if (extras.abort) { - return extras.abort(e); - } - - return undefined; - } - }, strategy); - - stream.controller = controllerToCopyOver; - stream.events = []; - - return stream; -}; - -exports.recordingTransformStream = (extras = {}, writableStrategy, readableStrategy) => { - let controllerToCopyOver; - const stream = new TransformStream({ - start(controller) { - controllerToCopyOver = controller; - - if (extras.start) { - return extras.start(controller); - } - - return undefined; - }, - - transform(chunk, controller) { - stream.events.push('transform', chunk); - - if (extras.transform) { - return extras.transform(chunk, controller); - } - - controller.enqueue(chunk); - - return undefined; - }, - - flush(controller) { - stream.events.push('flush'); - - if (extras.flush) { - return extras.flush(controller); - } - - return undefined; - } - }, writableStrategy, readableStrategy); - - stream.controller = controllerToCopyOver; - stream.events = []; - - return stream; -};