Skip to content

Commit

Permalink
Merge pull request #100 from MattiasBuelens/rewrite-async-iterator
Browse files Browse the repository at this point in the history
Rewrite async iteration using the public API
  • Loading branch information
MattiasBuelens committed Oct 29, 2021
2 parents 4d0d23f + 8194e28 commit cbe8006
Showing 1 changed file with 24 additions and 47 deletions.
71 changes: 24 additions & 47 deletions src/lib/readable-stream/async-iterator.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
/// <reference lib="es2018.asynciterable" />

import type { ReadableStream } from '../readable-stream';
import type { ReadableStreamDefaultReader, ReadableStreamDefaultReadResult, ReadRequest } from './default-reader';
import { AcquireReadableStreamDefaultReader, ReadableStreamDefaultReaderRead } from './default-reader';
import {
ReadableStreamReaderGenericCancel,
ReadableStreamReaderGenericRelease,
readerLockException
} from './generic-reader';
import type { ReadableStreamDefaultReader, ReadableStreamDefaultReadResult } from './default-reader';
import { readerLockException } from './generic-reader';
import assert from '../../stub/assert';
import { typeIsObject } from '../helpers/miscellaneous';
import {
newPromise,
promiseRejectedWith,
promiseResolvedWith,
queueMicrotask,
transformPromiseWith
} from '../helpers/webidl';
import { PerformPromiseThen, promiseRejectedWith, promiseResolvedWith, transformPromiseWith } from '../helpers/webidl';

/**
* An async iterator returned by {@link ReadableStream.values}.
Expand All @@ -30,7 +19,7 @@ export interface ReadableStreamAsyncIterator<R> extends AsyncIterator<R> {
}

export class ReadableStreamAsyncIteratorImpl<R> {
private readonly _reader: ReadableStreamDefaultReader<R>;
private _reader: ReadableStreamDefaultReader<R> | undefined;
private readonly _preventCancel: boolean;
private _ongoingPromise: Promise<ReadableStreamDefaultReadResult<R>> | undefined = undefined;
private _isFinished = false;
Expand Down Expand Up @@ -61,38 +50,25 @@ export class ReadableStreamAsyncIteratorImpl<R> {
}

const reader = this._reader;
if (reader._ownerReadableStream === undefined) {
if (reader === undefined) {
return promiseRejectedWith(readerLockException('iterate'));
}

let resolvePromise!: (result: ReadableStreamDefaultReadResult<R>) => void;
let rejectPromise!: (reason: any) => void;
const promise = newPromise<ReadableStreamDefaultReadResult<R>>((resolve, reject) => {
resolvePromise = resolve;
rejectPromise = reject;
});
const readRequest: ReadRequest<R> = {
_chunkSteps: chunk => {
this._ongoingPromise = undefined;
// This needs to be delayed by one microtask, otherwise we stop pulling too early which breaks a test.
// FIXME Is this a bug in the specification, or in the test?
queueMicrotask(() => resolvePromise({ value: chunk, done: false }));
},
_closeSteps: () => {
this._ongoingPromise = undefined;
this._isFinished = true;
ReadableStreamReaderGenericRelease(reader);
resolvePromise({ value: undefined, done: true });
},
_errorSteps: reason => {
this._ongoingPromise = undefined;
return PerformPromiseThen(reader.read(), result => {
this._ongoingPromise = undefined;
if (result.done) {
this._isFinished = true;
ReadableStreamReaderGenericRelease(reader);
rejectPromise(reason);
this._reader?.releaseLock();
this._reader = undefined;
}
};
ReadableStreamDefaultReaderRead(reader, readRequest);
return promise;
return result;
}, reason => {
this._ongoingPromise = undefined;
this._isFinished = true;
this._reader?.releaseLock();
this._reader = undefined;
throw reason;
});
}

private _returnSteps(value: any): Promise<ReadableStreamDefaultReadResult<any>> {
Expand All @@ -102,19 +78,20 @@ export class ReadableStreamAsyncIteratorImpl<R> {
this._isFinished = true;

const reader = this._reader;
if (reader._ownerReadableStream === undefined) {
if (reader === undefined) {
return promiseRejectedWith(readerLockException('finish iterating'));
}

assert(reader._readRequests.length === 0);

this._reader = undefined;
if (!this._preventCancel) {
const result = ReadableStreamReaderGenericCancel(reader, value);
ReadableStreamReaderGenericRelease(reader);
const result = reader.cancel(value);
reader.releaseLock();
return transformPromiseWith(result, () => ({ value, done: true }));
}

ReadableStreamReaderGenericRelease(reader);
reader.releaseLock();
return promiseResolvedWith({ value, done: true });
}
}
Expand Down Expand Up @@ -160,7 +137,7 @@ if (typeof Symbol.asyncIterator === 'symbol') {

export function AcquireReadableStreamAsyncIterator<R>(stream: ReadableStream<R>,
preventCancel: boolean): ReadableStreamAsyncIterator<R> {
const reader = AcquireReadableStreamDefaultReader<R>(stream);
const reader = stream.getReader();
const impl = new ReadableStreamAsyncIteratorImpl(reader, preventCancel);
const iterator: ReadableStreamAsyncIteratorInstance<R> = Object.create(ReadableStreamAsyncIteratorPrototype);
iterator._asyncIteratorImpl = impl;
Expand Down

0 comments on commit cbe8006

Please sign in to comment.