Skip to content

Commit

Permalink
Merge pull request #119 from MattiasBuelens/fix-abort-pipe-with-preve…
Browse files Browse the repository at this point in the history
…nt-cancel

Fix aborting a pipe with preventCancel = true
  • Loading branch information
MattiasBuelens authored May 24, 2022
2 parents f6ed190 + b407627 commit 18529d0
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 57 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
## Unreleased

* 👓 Align with [spec version `e9355ce`](https://github.com/whatwg/streams/tree/e9355ce79925947e8eb496563d599c329769d315/) ([#115](https://github.com/MattiasBuelens/web-streams-polyfill/issues/115), [#117](https://github.com/MattiasBuelens/web-streams-polyfill/pull/117))
* 👓 Align with [spec version `e9355ce`](https://github.com/whatwg/streams/tree/e9355ce79925947e8eb496563d599c329769d315/). ([#115](https://github.com/MattiasBuelens/web-streams-polyfill/issues/115), [#117](https://github.com/MattiasBuelens/web-streams-polyfill/pull/117))
* 🐛 Fix `pipeTo()` never rejecting when aborting its `signal` and `preventCancel` is set to `true`. ([#118](https://github.com/MattiasBuelens/web-streams-polyfill/issues/118), [#119](https://github.com/MattiasBuelens/web-streams-polyfill/pull/119))

## v4.0.0-beta.2 (2022-04-12)

Expand Down
67 changes: 17 additions & 50 deletions src/lib/readable-stream/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,8 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
resolveStart = resolve;
});

// This is used to keep track of the spec's requirement that we wait for ongoing reads and writes during shutdown.
let currentRead: Promise<unknown> | undefined;
let currentWrite: Promise<unknown> | undefined;
// This is used to keep track of the spec's requirement that we wait for ongoing writes during shutdown.
let currentWrite: Promise<unknown> = Promise.resolve(undefined);

return newPromise((resolve, reject) => {
let abortAlgorithm: () => void;
Expand Down Expand Up @@ -124,16 +123,14 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
}

return PerformPromiseThen(writer.ready, () => {
const read = PerformPromiseThen(reader.read(), result => {
return PerformPromiseThen(reader.read(), result => {
if (result.done) {
return true;
}
currentWrite = writer.write(result.value);
setPromiseIsHandledToTrue(currentWrite);
return false;
});
currentRead = read;
return read;
});
}

Expand All @@ -151,21 +148,21 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
if (destCloseRequested || destState === 'closed') {
return promiseResolvedWith(undefined);
}
if (destState === 'errored') {
if (destState === 'erroring' || destState === 'errored') {
return promiseRejectedWith(destStoredError);
}
assert(destState === 'writable' || destState === 'erroring');
assert(destState === 'writable');
destCloseRequested = true;
return writer.close();
}, false, undefined, true);
}, false, undefined);
} else {
shutdown();
}
return null;
}

function handleSourceError(storedError: any): null {
if (released) {
if (shuttingDown) {
return null;
}
// Errors must be propagated forward
Expand All @@ -181,7 +178,9 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
}

function handleDestClose(): null {
assert(!released);
if (released) {
return null;
}
assert(!IsWritableStream(dest) || dest._state === 'closed');
destState = 'closed';
return null;
Expand All @@ -192,7 +191,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
return null;
}
// Errors must be propagated backward
assert(!IsWritableStream(dest) || dest._state === 'errored');
assert(!IsWritableStream(dest) || dest._state === 'erroring' || dest._state === 'errored');
destState = 'errored';
destStoredError = storedError;
if (!preventCancel) {
Expand Down Expand Up @@ -223,7 +222,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
if (sourceState === 'errored') {
// Errors must be propagated forward
handleSourceError(sourceStoredError);
} else if (destState === 'errored') {
} else if (destState === 'erroring' || destState === 'errored') {
// Errors must be propagated backward
handleDestError(destStoredError);
} else if (sourceState === 'closed') {
Expand Down Expand Up @@ -273,30 +272,9 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
}
}

function waitForReadsAndWritesToFinish(): Promise<void> {
let oldCurrentRead: Promise<unknown> | undefined;
let oldCurrentWrite: Promise<unknown> | undefined;
return promiseResolvedWith(check());

function check(): undefined | Promise<undefined> {
// Another read or write may have started while we were waiting on this currentRead or currentWrite,
// so we have to be sure to wait for that too.
if (oldCurrentRead !== currentRead) {
oldCurrentRead = currentRead;
return transformPromiseWith(currentRead!, check, check);
}
if (oldCurrentWrite !== currentWrite) {
oldCurrentWrite = currentWrite;
return transformPromiseWith(currentWrite!, check, check);
}
return undefined;
}
}

function shutdownWithAction(action: (() => Promise<unknown>) | undefined,
isError?: boolean,
error?: any,
waitForReads?: boolean) {
error?: any) {
if (shuttingDown) {
return;
}
Expand All @@ -309,9 +287,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
}

function onStart(): null {
if (waitForReads && (currentRead !== undefined || currentWrite !== undefined)) {
uponFulfillment(waitForReadsAndWritesToFinish(), doTheRest);
} else if (currentWrite !== undefined) {
if (destState === 'writable' && !destCloseRequested) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
} else {
doTheRest();
Expand All @@ -323,11 +299,11 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
if (action) {
uponPromise(
action(),
() => waitForReadsAndWritesThenFinalize(isError, error),
newError => waitForReadsAndWritesThenFinalize(true, newError)
() => finalize(isError, error),
newError => finalize(true, newError)
);
} else {
waitForReadsAndWritesThenFinalize(isError, error);
finalize(isError, error);
}
return null;
}
Expand All @@ -337,15 +313,6 @@ export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
shutdownWithAction(undefined, isError, error);
}

function waitForReadsAndWritesThenFinalize(isError?: boolean, error?: any): null {
if (currentRead !== undefined || currentWrite !== undefined) {
uponFulfillment(waitForReadsAndWritesToFinish(), () => finalize(isError, error));
} else {
finalize(isError, error);
}
return null;
}

function finalize(isError?: boolean, error?: any): null {
released = true;
writer.releaseLock();
Expand Down
26 changes: 25 additions & 1 deletion test/unit/readable-stream/regression.spec.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
require('abort-controller/polyfill');
const { ReadableStream, WritableStream, TransformStream } = require('web-streams-polyfill');
const { delay } = require('../util/delay');

describe('ReadableStream regressions', () => {
// https://github.com/MattiasBuelens/web-streams-polyfill/issues/66
it('#66', async () => {
it('issue #66', async () => {
const { readable, writable } = new TransformStream();

const producer = (async () => {
Expand Down Expand Up @@ -54,4 +55,27 @@ describe('ReadableStream regressions', () => {
expect(() => readableGetter.call(fakeTransformStream)).toThrow(jasmine.any(TypeError));
});
});

// https://github.com/MattiasBuelens/web-streams-polyfill/issues/118
it('issue #118', async () => {
const readable = new ReadableStream();
const abortController = new AbortController();
const events = [];

readable.pipeTo(new WritableStream(), {
preventCancel: true,
signal: abortController.signal
}).then(() => {
events.push('resolve');
}, () => {
events.push('rejected');
});

await delay(100);
events.push('abort');
abortController.abort();

await delay(100);
expect(events).toEqual(['abort', 'rejected']);
});
});
3 changes: 3 additions & 0 deletions test/unit/util/delay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
exports.delay = function (ms) {
return new Promise(resolve => setTimeout(resolve, ms));
};
5 changes: 0 additions & 5 deletions test/wpt/shared/exclusions.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ const excludedTestsNonES2018 = [
];

const skippedTests = {
'piping/error-propagation-backward.any.html': [
// This test cheats: pipeTo() releases the reader's lock while there's still a pending read().
// The polyfill cannot do this, because it uses the reader.releaseLock() public API.
'Errors must be propagated backward: becomes errored after piping; preventCancel = true'
]
};

const ignoredFailuresBase = {
Expand Down

0 comments on commit 18529d0

Please sign in to comment.