Skip to content

Commit

Permalink
feat(ReadableStreams): Support for ReadableStreams e.g. `from(readabl…
Browse files Browse the repository at this point in the history
…eStream)` (#6163)

* feat(ReadableStreams): Support for ReadableStreams e.g. `from(readableStream)`

* feat(ReadableStreams): stub ReadableStreamDefaultReader type definition.

* feat(ReadableStreams): add comments.

* refactor: Export ReadableStreamLike, add dtslint test

* chore: update imports and api_guardian golden files

Co-authored-by: Ben Lesh <[email protected]>
  • Loading branch information
jayphelps and benlesh authored Mar 25, 2021
1 parent 8e5f90a commit 19d6502
Show file tree
Hide file tree
Showing 11 changed files with 195 additions and 21 deletions.
6 changes: 5 additions & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ export declare class Observable<T> implements Subscribable<T> {
static create: (...args: any[]) => any;
}

export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T> | ReadableStreamLike<T>;

export declare type ObservableInputTuple<T> = {
[K in keyof T]: ObservableInput<T[K]>;
Expand Down Expand Up @@ -348,6 +348,10 @@ export declare function race<T extends readonly unknown[]>(...inputs: [...Observ
export declare function range(start: number, count?: number): Observable<number>;
export declare function range(start: number, count: number | undefined, scheduler: SchedulerLike): Observable<number>;

export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReaderLike<T>;
}

export declare class ReplaySubject<T> extends Subject<T> {
constructor(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider);
protected _subscribe(subscriber: Subscriber<T>): Subscription;
Expand Down
16 changes: 16 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@
"typedoc": "^0.17.8",
"typescript": "~4.2.2",
"validate-commit-msg": "2.14.0",
"web-streams-polyfill": "^3.0.2",
"webpack": "^4.31.0"
},
"files": [
Expand Down
12 changes: 11 additions & 1 deletion spec-dtslint/observables/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { from, of, animationFrameScheduler } from 'rxjs';
import { from, of, animationFrameScheduler, ReadableStreamLike } from 'rxjs';

it('should accept an array', () => {
const o = from([1, 2, 3, 4]); // $ExpectType Observable<number>
Expand Down Expand Up @@ -55,3 +55,13 @@ it('should accept an array of Observables', () => {
it('should support scheduler', () => {
const a = from([1, 2, 3], animationFrameScheduler); // $ExpectType Observable<number>
});

it('should accept a ReadableStream', () => {
const stream: ReadableStreamLike<string> = new ReadableStream<string>({
pull(controller) {
controller.enqueue('x');
controller.close();
},
});
const o = from(stream); // $ExpectType Observable<string>
});
97 changes: 82 additions & 15 deletions spec/observables/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { expect } from 'chai';
import { TestScheduler } from 'rxjs/testing';
import { asyncScheduler, of, from, Observer, observable, Subject, noop } from 'rxjs';
import { first, concatMap, delay, take, map, tap } from 'rxjs/operators';
import { first, concatMap, delay, take, tap } from 'rxjs/operators';
import { ReadableStream } from 'web-streams-polyfill';

// tslint:disable:no-any
declare const expectObservable: any;
Expand Down Expand Up @@ -108,7 +109,7 @@ describe('from', () => {
});
});


it('should finalize a generator', () => {
const results: any[] = [];

Expand Down Expand Up @@ -175,16 +176,24 @@ describe('from', () => {
});

// tslint:disable-next-line:no-any it's silly to define all of these types.
const sources: Array<{ name: string, value: any }> = [
{ name: 'observable', value: of('x') },
{ name: 'observable-like', value: fakervable('x') },
{ name: 'observable-like-array', value: fakeArrayObservable('x') },
{ name: 'array', value: ['x'] },
{ name: 'promise', value: Promise.resolve('x') },
{ name: 'iterator', value: fakerator('x') },
{ name: 'array-like', value: { [0]: 'x', length: 1 }},
{ name: 'string', value: 'x'},
{ name: 'arguments', value: getArguments('x') },
const sources: Array<{ name: string, createValue: () => any }> = [
{ name: 'observable', createValue: () => of('x') },
{ name: 'observable-like', createValue: () => fakervable('x') },
{ name: 'observable-like-array', createValue: () => fakeArrayObservable('x') },
{ name: 'array', createValue: () => ['x'] },
{ name: 'promise', createValue: () => Promise.resolve('x') },
{ name: 'iterator', createValue: () => fakerator('x') },
{ name: 'array-like', createValue: () => ({ [0]: 'x', length: 1 }) },
// ReadableStreams are not lazy, so we have to have this createValue() thunk
// so that each tests gets a new one.
{ name: 'readable-stream-like', createValue: () => new ReadableStream({
pull(controller) {
controller.enqueue('x');
controller.close();
},
})},
{ name: 'string', createValue: () => 'x'},
{ name: 'arguments', createValue: () => getArguments('x') },
];

if (Symbol && Symbol.asyncIterator) {
Expand All @@ -211,14 +220,14 @@ describe('from', () => {

sources.push({
name: 'async-iterator',
value: fakeAsyncIterator('x')
createValue: () => fakeAsyncIterator('x')
});
}

for (const source of sources) {
it(`should accept ${source.name}`, (done) => {
let nextInvoked = false;
from(source.value)
from(source.createValue())
.subscribe(
(x) => {
nextInvoked = true;
Expand All @@ -235,7 +244,7 @@ describe('from', () => {
});
it(`should accept ${source.name} and scheduler`, (done) => {
let nextInvoked = false;
from(source.value, asyncScheduler)
from(source.createValue(), asyncScheduler)
.subscribe(
(x) => {
nextInvoked = true;
Expand Down Expand Up @@ -321,4 +330,62 @@ describe('from', () => {

expect(finallyExecuted).to.be.true;
});

it('should support ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});

it('should lock and release ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});
});
10 changes: 9 additions & 1 deletion src/internal/observable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import { observable as Symbol_observable } from '../symbol/observable';
import { Subscriber } from '../Subscriber';

import { Observable } from '../Observable';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { ObservableInput, SchedulerLike, ObservedValueOf, ReadableStreamLike } from '../types';
import { scheduled } from '../scheduled/scheduled';
import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated The scheduler argument is deprecated, use scheduled. Details: https://rxjs.dev/deprecations/scheduler-argument */
Expand Down Expand Up @@ -141,6 +142,9 @@ export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
Expand Down Expand Up @@ -220,6 +224,10 @@ function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
});
}

function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
Expand Down
8 changes: 8 additions & 0 deletions src/internal/scheduled/scheduleReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { SchedulerLike, ReadableStreamLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function scheduleReadableStreamLike<T>(input: ReadableStreamLike<T>, scheduler: SchedulerLike): Observable<T> {
return scheduleAsyncIterable(readableStreamLikeToAsyncGenerator(input), scheduler);
}
7 changes: 6 additions & 1 deletion src/internal/scheduled/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ import { scheduleObservable } from './scheduleObservable';
import { schedulePromise } from './schedulePromise';
import { scheduleArray } from './scheduleArray';
import { scheduleIterable } from './scheduleIterable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isInteropObservable } from '../util/isInteropObservable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isIterable } from '../util/isIterable';
import { ObservableInput, SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isReadableStreamLike } from '../util/isReadableStreamLike';
import { scheduleReadableStreamLike } from './scheduleReadableStreamLike';

/**
* Converts from a common {@link ObservableInput} type to an observable where subscription and emissions
Expand Down Expand Up @@ -40,6 +42,9 @@ export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike
if (isIterable(input)) {
return scheduleIterable(input, scheduler);
}
if (isReadableStreamLike(input)) {
return scheduleReadableStreamLike(input, scheduler);
}
}
throw createInvalidObservableTypeError(input);
}
34 changes: 33 additions & 1 deletion src/internal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ export interface Subscribable<T> {
/**
* Valid types that can be converted to observables.
*/
export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export type ObservableInput<T> =
| Observable<T>
| InteropObservable<T>
| AsyncIterable<T>
| PromiseLike<T>
| ArrayLike<T>
| Iterable<T>
| ReadableStreamLike<T>;

/** @deprecated use {@link InteropObservable } */
export type ObservableLike<T> = InteropObservable<T>;
Expand Down Expand Up @@ -276,3 +283,28 @@ export type ValueFromNotification<T> = T extends { kind: 'N' | 'E' | 'C' }
export type Falsy = null | undefined | false | 0 | -0 | 0n | '';

export type TruthyTypesOf<T> = T extends Falsy ? never : T;

// We shouldn't rely on this type definition being available globally yet since it's
// not necessarily available in every TS environment.
interface ReadableStreamDefaultReaderLike<T> {
// HACK: As of TS 4.2.2, The provided types for the iterator results of a `ReadableStreamDefaultReader`
// are significantly different enough from `IteratorResult` as to cause compilation errors.
// The type at the time is `ReadableStreamDefaultReadResult`.
read(): PromiseLike<
| {
done: false;
value: T;
}
| { done: true; value?: undefined }
>;
releaseLock(): void;
}

/**
* The base signature RxJS will look for to identify and use
* a [ReadableStream](https://streams.spec.whatwg.org/#rs-class)
* as an {@link ObservableInput} source.
*/
export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReaderLike<T>;
}
23 changes: 23 additions & 0 deletions src/internal/util/isReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ReadableStreamLike } from '../types';
import { isFunction } from './isFunction';

export async function* readableStreamLikeToAsyncGenerator<T>(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
const reader = readableStream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
return;
}
yield value!;
}
} finally {
reader.releaseLock();
}
}

export function isReadableStreamLike<T>(obj: any): obj is ReadableStreamLike<T> {
// We don't want to use instanceof checks because they would return
// false for instances from another Realm, like an <iframe>.
return isFunction(obj?.getReader);
}
2 changes: 1 addition & 1 deletion src/internal/util/throwUnobservableError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export function createInvalidObservableTypeError(input: any) {
return new TypeError(
`You provided ${
input !== null && typeof input === 'object' ? 'an invalid object' : `'${input}'`
} where a stream was expected. You can provide an Observable, Promise, Array, AsyncIterable, or Iterable.`
} where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.`
);
}

0 comments on commit 19d6502

Please sign in to comment.