diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 3255065f84..be1da4a92b 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -269,7 +269,7 @@ export declare class Observable implements Subscribable { static create: (...args: any[]) => any; } -export declare type ObservableInput = Observable | InteropObservable | AsyncIterable | PromiseLike | ArrayLike | Iterable; +export declare type ObservableInput = Observable | InteropObservable | AsyncIterable | PromiseLike | ArrayLike | Iterable | ReadableStreamLike; export declare type ObservableInputTuple = { [K in keyof T]: ObservableInput; @@ -348,6 +348,10 @@ export declare function race(...inputs: [...Observ export declare function range(start: number, count?: number): Observable; export declare function range(start: number, count: number | undefined, scheduler: SchedulerLike): Observable; +export interface ReadableStreamLike { + getReader(): ReadableStreamDefaultReaderLike; +} + export declare class ReplaySubject extends Subject { constructor(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider); protected _subscribe(subscriber: Subscriber): Subscription; diff --git a/package-lock.json b/package-lock.json index 59019e5491..d7cdd93969 100644 --- a/package-lock.json +++ b/package-lock.json @@ -69,6 +69,7 @@ "typedoc": "^0.17.8", "typescript": "~4.2.2", "validate-commit-msg": "2.14.0", + "web-streams-polyfill": "^3.0.2", "webpack": "^4.31.0" } }, @@ -11241,6 +11242,15 @@ "defaults": "^1.0.3" } }, + "node_modules/web-streams-polyfill": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.0.2.tgz", + "integrity": "sha512-JTNkNbAKoSo8NKiqu2UUaqRFCDWWZaCOsXuJEsToWopikTA0YHKKUf91GNkS/SnD8JixOkJjVsiacNlrFnRECA==", + "dev": true, + "engines": { + "node": ">= 8" + } + }, "node_modules/webpack": { "version": "4.39.3", "resolved": "https://registry.npmjs.org/webpack/-/webpack-4.39.3.tgz", @@ -20933,6 +20943,12 @@ "defaults": "^1.0.3" } }, + "web-streams-polyfill": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.0.2.tgz", + "integrity": "sha512-JTNkNbAKoSo8NKiqu2UUaqRFCDWWZaCOsXuJEsToWopikTA0YHKKUf91GNkS/SnD8JixOkJjVsiacNlrFnRECA==", + "dev": true + }, "webpack": { "version": "4.39.3", "resolved": "https://registry.npmjs.org/webpack/-/webpack-4.39.3.tgz", diff --git a/package.json b/package.json index 8afc80ecb2..de29d2e05d 100644 --- a/package.json +++ b/package.json @@ -156,6 +156,7 @@ "typedoc": "^0.17.8", "typescript": "~4.2.2", "validate-commit-msg": "2.14.0", + "web-streams-polyfill": "^3.0.2", "webpack": "^4.31.0" }, "files": [ diff --git a/spec-dtslint/observables/from-spec.ts b/spec-dtslint/observables/from-spec.ts index acc406b55e..1b56cc91e3 100644 --- a/spec-dtslint/observables/from-spec.ts +++ b/spec-dtslint/observables/from-spec.ts @@ -1,4 +1,4 @@ -import { from, of, animationFrameScheduler } from 'rxjs'; +import { from, of, animationFrameScheduler, ReadableStreamLike } from 'rxjs'; it('should accept an array', () => { const o = from([1, 2, 3, 4]); // $ExpectType Observable @@ -55,3 +55,13 @@ it('should accept an array of Observables', () => { it('should support scheduler', () => { const a = from([1, 2, 3], animationFrameScheduler); // $ExpectType Observable }); + +it('should accept a ReadableStream', () => { + const stream: ReadableStreamLike = new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + }); + const o = from(stream); // $ExpectType Observable +}); \ No newline at end of file diff --git a/spec/observables/from-spec.ts b/spec/observables/from-spec.ts index e26f5f2c3d..3e5d230240 100644 --- a/spec/observables/from-spec.ts +++ b/spec/observables/from-spec.ts @@ -1,7 +1,8 @@ import { expect } from 'chai'; import { TestScheduler } from 'rxjs/testing'; import { asyncScheduler, of, from, Observer, observable, Subject, noop } from 'rxjs'; -import { first, concatMap, delay, take, map, tap } from 'rxjs/operators'; +import { first, concatMap, delay, take, tap } from 'rxjs/operators'; +import { ReadableStream } from 'web-streams-polyfill'; // tslint:disable:no-any declare const expectObservable: any; @@ -108,7 +109,7 @@ describe('from', () => { }); }); - + it('should finalize a generator', () => { const results: any[] = []; @@ -175,16 +176,24 @@ describe('from', () => { }); // tslint:disable-next-line:no-any it's silly to define all of these types. - const sources: Array<{ name: string, value: any }> = [ - { name: 'observable', value: of('x') }, - { name: 'observable-like', value: fakervable('x') }, - { name: 'observable-like-array', value: fakeArrayObservable('x') }, - { name: 'array', value: ['x'] }, - { name: 'promise', value: Promise.resolve('x') }, - { name: 'iterator', value: fakerator('x') }, - { name: 'array-like', value: { [0]: 'x', length: 1 }}, - { name: 'string', value: 'x'}, - { name: 'arguments', value: getArguments('x') }, + const sources: Array<{ name: string, createValue: () => any }> = [ + { name: 'observable', createValue: () => of('x') }, + { name: 'observable-like', createValue: () => fakervable('x') }, + { name: 'observable-like-array', createValue: () => fakeArrayObservable('x') }, + { name: 'array', createValue: () => ['x'] }, + { name: 'promise', createValue: () => Promise.resolve('x') }, + { name: 'iterator', createValue: () => fakerator('x') }, + { name: 'array-like', createValue: () => ({ [0]: 'x', length: 1 }) }, + // ReadableStreams are not lazy, so we have to have this createValue() thunk + // so that each tests gets a new one. + { name: 'readable-stream-like', createValue: () => new ReadableStream({ + pull(controller) { + controller.enqueue('x'); + controller.close(); + }, + })}, + { name: 'string', createValue: () => 'x'}, + { name: 'arguments', createValue: () => getArguments('x') }, ]; if (Symbol && Symbol.asyncIterator) { @@ -211,14 +220,14 @@ describe('from', () => { sources.push({ name: 'async-iterator', - value: fakeAsyncIterator('x') + createValue: () => fakeAsyncIterator('x') }); } for (const source of sources) { it(`should accept ${source.name}`, (done) => { let nextInvoked = false; - from(source.value) + from(source.createValue()) .subscribe( (x) => { nextInvoked = true; @@ -235,7 +244,7 @@ describe('from', () => { }); it(`should accept ${source.name} and scheduler`, (done) => { let nextInvoked = false; - from(source.value, asyncScheduler) + from(source.createValue(), asyncScheduler) .subscribe( (x) => { nextInvoked = true; @@ -321,4 +330,62 @@ describe('from', () => { expect(finallyExecuted).to.be.true; }); + + it('should support ReadableStream-like objects', (done) => { + const input = [0, 1, 2]; + const output: number[] = []; + + const readableStream = new ReadableStream({ + pull(controller) { + if (input.length > 0) { + controller.enqueue(input.shift()); + + if (input.length === 0) { + controller.close(); + } + } + }, + }); + + from(readableStream).subscribe({ + next: value => { + output.push(value); + expect(readableStream.locked).to.equal(true); + }, + complete: () => { + expect(output).to.deep.equal([0, 1, 2]); + expect(readableStream.locked).to.equal(false); + done(); + } + }); + }); + + it('should lock and release ReadableStream-like objects', (done) => { + const input = [0, 1, 2]; + const output: number[] = []; + + const readableStream = new ReadableStream({ + pull(controller) { + if (input.length > 0) { + controller.enqueue(input.shift()); + + if (input.length === 0) { + controller.close(); + } + } + }, + }); + + from(readableStream).subscribe({ + next: value => { + output.push(value); + expect(readableStream.locked).to.equal(true); + }, + complete: () => { + expect(output).to.deep.equal([0, 1, 2]); + expect(readableStream.locked).to.equal(false); + done(); + } + }); + }); }); diff --git a/src/internal/observable/from.ts b/src/internal/observable/from.ts index 45d9541698..ddf8a0809a 100644 --- a/src/internal/observable/from.ts +++ b/src/internal/observable/from.ts @@ -4,7 +4,7 @@ import { observable as Symbol_observable } from '../symbol/observable'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types'; +import { ObservableInput, SchedulerLike, ObservedValueOf, ReadableStreamLike } from '../types'; import { scheduled } from '../scheduled/scheduled'; import { isFunction } from '../util/isFunction'; import { reportUnhandledError } from '../util/reportUnhandledError'; @@ -12,6 +12,7 @@ import { isInteropObservable } from '../util/isInteropObservable'; import { isAsyncIterable } from '../util/isAsyncIterable'; import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; import { isIterable } from '../util/isIterable'; +import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; export function from>(input: O): Observable>; /** @deprecated The scheduler argument is deprecated, use scheduled. Details: https://rxjs.dev/deprecations/scheduler-argument */ @@ -141,6 +142,9 @@ export function innerFrom(input: ObservableInput): Observable { if (isIterable(input)) { return fromIterable(input); } + if (isReadableStreamLike(input)) { + return fromReadableStreamLike(input); + } } throw createInvalidObservableTypeError(input); @@ -220,6 +224,10 @@ function fromAsyncIterable(asyncIterable: AsyncIterable) { }); } +function fromReadableStreamLike(readableStream: ReadableStreamLike) { + return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream)); +} + async function process(asyncIterable: AsyncIterable, subscriber: Subscriber) { for await (const value of asyncIterable) { subscriber.next(value); diff --git a/src/internal/scheduled/scheduleReadableStreamLike.ts b/src/internal/scheduled/scheduleReadableStreamLike.ts new file mode 100644 index 0000000000..d742f10722 --- /dev/null +++ b/src/internal/scheduled/scheduleReadableStreamLike.ts @@ -0,0 +1,8 @@ +import { SchedulerLike, ReadableStreamLike } from '../types'; +import { Observable } from '../Observable'; +import { scheduleAsyncIterable } from './scheduleAsyncIterable'; +import { readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike'; + +export function scheduleReadableStreamLike(input: ReadableStreamLike, scheduler: SchedulerLike): Observable { + return scheduleAsyncIterable(readableStreamLikeToAsyncGenerator(input), scheduler); +} diff --git a/src/internal/scheduled/scheduled.ts b/src/internal/scheduled/scheduled.ts index 92e08b972d..bb2e42574e 100644 --- a/src/internal/scheduled/scheduled.ts +++ b/src/internal/scheduled/scheduled.ts @@ -2,15 +2,17 @@ import { scheduleObservable } from './scheduleObservable'; import { schedulePromise } from './schedulePromise'; import { scheduleArray } from './scheduleArray'; import { scheduleIterable } from './scheduleIterable'; +import { scheduleAsyncIterable } from './scheduleAsyncIterable'; import { isInteropObservable } from '../util/isInteropObservable'; import { isPromise } from '../util/isPromise'; import { isArrayLike } from '../util/isArrayLike'; import { isIterable } from '../util/isIterable'; import { ObservableInput, SchedulerLike } from '../types'; import { Observable } from '../Observable'; -import { scheduleAsyncIterable } from './scheduleAsyncIterable'; import { isAsyncIterable } from '../util/isAsyncIterable'; import { createInvalidObservableTypeError } from '../util/throwUnobservableError'; +import { isReadableStreamLike } from '../util/isReadableStreamLike'; +import { scheduleReadableStreamLike } from './scheduleReadableStreamLike'; /** * Converts from a common {@link ObservableInput} type to an observable where subscription and emissions @@ -40,6 +42,9 @@ export function scheduled(input: ObservableInput, scheduler: SchedulerLike if (isIterable(input)) { return scheduleIterable(input, scheduler); } + if (isReadableStreamLike(input)) { + return scheduleReadableStreamLike(input, scheduler); + } } throw createInvalidObservableTypeError(input); } diff --git a/src/internal/types.ts b/src/internal/types.ts index 7dfee85933..5d3f8b9881 100644 --- a/src/internal/types.ts +++ b/src/internal/types.ts @@ -91,7 +91,14 @@ export interface Subscribable { /** * Valid types that can be converted to observables. */ -export type ObservableInput = Observable | InteropObservable | AsyncIterable | PromiseLike | ArrayLike | Iterable; +export type ObservableInput = + | Observable + | InteropObservable + | AsyncIterable + | PromiseLike + | ArrayLike + | Iterable + | ReadableStreamLike; /** @deprecated use {@link InteropObservable } */ export type ObservableLike = InteropObservable; @@ -276,3 +283,28 @@ export type ValueFromNotification = T extends { kind: 'N' | 'E' | 'C' } export type Falsy = null | undefined | false | 0 | -0 | 0n | ''; export type TruthyTypesOf = T extends Falsy ? never : T; + +// We shouldn't rely on this type definition being available globally yet since it's +// not necessarily available in every TS environment. +interface ReadableStreamDefaultReaderLike { + // HACK: As of TS 4.2.2, The provided types for the iterator results of a `ReadableStreamDefaultReader` + // are significantly different enough from `IteratorResult` as to cause compilation errors. + // The type at the time is `ReadableStreamDefaultReadResult`. + read(): PromiseLike< + | { + done: false; + value: T; + } + | { done: true; value?: undefined } + >; + releaseLock(): void; +} + +/** + * The base signature RxJS will look for to identify and use + * a [ReadableStream](https://streams.spec.whatwg.org/#rs-class) + * as an {@link ObservableInput} source. + */ +export interface ReadableStreamLike { + getReader(): ReadableStreamDefaultReaderLike; +} diff --git a/src/internal/util/isReadableStreamLike.ts b/src/internal/util/isReadableStreamLike.ts new file mode 100644 index 0000000000..87b9c15a88 --- /dev/null +++ b/src/internal/util/isReadableStreamLike.ts @@ -0,0 +1,23 @@ +import { ReadableStreamLike } from '../types'; +import { isFunction } from './isFunction'; + +export async function* readableStreamLikeToAsyncGenerator(readableStream: ReadableStreamLike): AsyncGenerator { + const reader = readableStream.getReader(); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + return; + } + yield value!; + } + } finally { + reader.releaseLock(); + } +} + +export function isReadableStreamLike(obj: any): obj is ReadableStreamLike { + // We don't want to use instanceof checks because they would return + // false for instances from another Realm, like an