Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ReadableStreams): Support for ReadableStreams e.g. from(readableStream) #6163

Merged
merged 5 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ export declare class Observable<T> implements Subscribable<T> {
static create: (...args: any[]) => any;
}

export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T> | ReadableStreamLike<T>;

export declare type ObservableInputTuple<T> = {
[K in keyof T]: ObservableInput<T[K]>;
Expand Down Expand Up @@ -348,6 +348,10 @@ export declare function race<T extends readonly unknown[]>(...inputs: [...Observ
export declare function range(start: number, count?: number): Observable<number>;
export declare function range(start: number, count: number | undefined, scheduler: SchedulerLike): Observable<number>;

export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReaderLike<T>;
}

export declare class ReplaySubject<T> extends Subject<T> {
constructor(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider);
protected _subscribe(subscriber: Subscriber<T>): Subscription;
Expand Down
20 changes: 18 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"typedoc": "^0.17.8",
"typescript": "~4.2.2",
"validate-commit-msg": "2.14.0",
"web-streams-polyfill": "^3.0.2",
"webpack": "^4.31.0"
},
"files": [
Expand Down
12 changes: 11 additions & 1 deletion spec-dtslint/observables/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { from, of, animationFrameScheduler } from 'rxjs';
import { from, of, animationFrameScheduler, ReadableStreamLike } from 'rxjs';

it('should accept an array', () => {
const o = from([1, 2, 3, 4]); // $ExpectType Observable<number>
Expand Down Expand Up @@ -55,3 +55,13 @@ it('should accept an array of Observables', () => {
it('should support scheduler', () => {
const a = from([1, 2, 3], animationFrameScheduler); // $ExpectType Observable<number>
});

it('should accept a ReadableStream', () => {
const stream: ReadableStreamLike<string> = new ReadableStream<string>({
pull(controller) {
controller.enqueue('x');
controller.close();
},
});
const o = from(stream); // $ExpectType Observable<string>
});
97 changes: 82 additions & 15 deletions spec/observables/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { expect } from 'chai';
import { TestScheduler } from 'rxjs/testing';
import { asyncScheduler, of, from, Observer, observable, Subject, noop } from 'rxjs';
import { first, concatMap, delay, take, map, tap } from 'rxjs/operators';
import { first, concatMap, delay, take, tap } from 'rxjs/operators';
import { ReadableStream } from 'web-streams-polyfill';

// tslint:disable:no-any
declare const expectObservable: any;
Expand Down Expand Up @@ -108,7 +109,7 @@ describe('from', () => {
});
});

benlesh marked this conversation as resolved.
Show resolved Hide resolved

it('should finalize a generator', () => {
const results: any[] = [];

Expand Down Expand Up @@ -175,16 +176,24 @@ describe('from', () => {
});

// tslint:disable-next-line:no-any it's silly to define all of these types.
const sources: Array<{ name: string, value: any }> = [
{ name: 'observable', value: of('x') },
{ name: 'observable-like', value: fakervable('x') },
{ name: 'observable-like-array', value: fakeArrayObservable('x') },
{ name: 'array', value: ['x'] },
{ name: 'promise', value: Promise.resolve('x') },
{ name: 'iterator', value: fakerator('x') },
{ name: 'array-like', value: { [0]: 'x', length: 1 }},
{ name: 'string', value: 'x'},
{ name: 'arguments', value: getArguments('x') },
const sources: Array<{ name: string, createValue: () => any }> = [
{ name: 'observable', createValue: () => of('x') },
{ name: 'observable-like', createValue: () => fakervable('x') },
{ name: 'observable-like-array', createValue: () => fakeArrayObservable('x') },
{ name: 'array', createValue: () => ['x'] },
{ name: 'promise', createValue: () => Promise.resolve('x') },
{ name: 'iterator', createValue: () => fakerator('x') },
{ name: 'array-like', createValue: () => ({ [0]: 'x', length: 1 }) },
// ReadableStreams are not lazy, so we have to have this createValue() thunk
// so that each tests gets a new one.
{ name: 'readable-stream-like', createValue: () => new ReadableStream({
pull(controller) {
controller.enqueue('x');
controller.close();
},
})},
{ name: 'string', createValue: () => 'x'},
{ name: 'arguments', createValue: () => getArguments('x') },
];

if (Symbol && Symbol.asyncIterator) {
Expand All @@ -211,14 +220,14 @@ describe('from', () => {

sources.push({
name: 'async-iterator',
value: fakeAsyncIterator('x')
createValue: () => fakeAsyncIterator('x')
});
}

for (const source of sources) {
it(`should accept ${source.name}`, (done) => {
let nextInvoked = false;
from(source.value)
from(source.createValue())
.subscribe(
(x) => {
nextInvoked = true;
Expand All @@ -235,7 +244,7 @@ describe('from', () => {
});
it(`should accept ${source.name} and scheduler`, (done) => {
let nextInvoked = false;
from(source.value, asyncScheduler)
from(source.createValue(), asyncScheduler)
.subscribe(
(x) => {
nextInvoked = true;
Expand Down Expand Up @@ -321,4 +330,62 @@ describe('from', () => {

expect(finallyExecuted).to.be.true;
});

it('should support ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});

it('should lock and release ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});
});
10 changes: 9 additions & 1 deletion src/internal/observable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import { observable as Symbol_observable } from '../symbol/observable';
import { Subscriber } from '../Subscriber';

import { Observable } from '../Observable';
import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types';
import { ObservableInput, SchedulerLike, ObservedValueOf, ReadableStreamLike } from '../types';
import { scheduled } from '../scheduled/scheduled';
import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated The scheduler argument is deprecated, use scheduled. Details: https://rxjs.dev/deprecations/scheduler-argument */
Expand Down Expand Up @@ -141,6 +142,9 @@ export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
Expand Down Expand Up @@ -220,6 +224,10 @@ function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
});
}

function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
Expand Down
8 changes: 8 additions & 0 deletions src/internal/scheduled/scheduleReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { SchedulerLike, ReadableStreamLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function scheduleReadableStreamLike<T>(input: ReadableStreamLike<T>, scheduler: SchedulerLike): Observable<T> {
return scheduleAsyncIterable(readableStreamLikeToAsyncGenerator(input), scheduler);
}
7 changes: 6 additions & 1 deletion src/internal/scheduled/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ import { scheduleObservable } from './scheduleObservable';
import { schedulePromise } from './schedulePromise';
import { scheduleArray } from './scheduleArray';
import { scheduleIterable } from './scheduleIterable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isInteropObservable } from '../util/isInteropObservable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isIterable } from '../util/isIterable';
import { ObservableInput, SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isReadableStreamLike } from '../util/isReadableStreamLike';
import { scheduleReadableStreamLike } from './scheduleReadableStreamLike';

/**
* Converts from a common {@link ObservableInput} type to an observable where subscription and emissions
Expand Down Expand Up @@ -40,6 +42,9 @@ export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike
if (isIterable(input)) {
return scheduleIterable(input, scheduler);
}
if (isReadableStreamLike(input)) {
return scheduleReadableStreamLike(input, scheduler);
}
}
throw createInvalidObservableTypeError(input);
}
34 changes: 33 additions & 1 deletion src/internal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,14 @@ export interface Subscribable<T> {
/**
* Valid types that can be converted to observables.
*/
export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export type ObservableInput<T> =
| Observable<T>
| InteropObservable<T>
| AsyncIterable<T>
| PromiseLike<T>
| ArrayLike<T>
| Iterable<T>
| ReadableStreamLike<T>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to change this and the other places to have a consistent order, if there's a preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the order of this should match the runtime order.


/** @deprecated use {@link InteropObservable } */
export type ObservableLike<T> = InteropObservable<T>;
Expand Down Expand Up @@ -276,3 +283,28 @@ export type ValueFromNotification<T> = T extends { kind: 'N' | 'E' | 'C' }
export type Falsy = null | undefined | false | 0 | -0 | 0n | '';

export type TruthyTypesOf<T> = T extends Falsy ? never : T;

// We shouldn't rely on this type definition being available globally yet since it's
// not necessarily available in every TS environment.
interface ReadableStreamDefaultReaderLike<T> {
// HACK: As of TS 4.2.2, The provided types for the iterator results of a `ReadableStreamDefaultReader`
// are significantly different enough from `IteratorResult` as to cause compilation errors.
// The type at the time is `ReadableStreamDefaultReadResult`.
read(): PromiseLike<
| {
done: false;
value: T;
}
| { done: true; value?: undefined }
>;
releaseLock(): void;
}

/**
* The base signature RxJS will look for to identify and use
* a [ReadableStream](https://streams.spec.whatwg.org/#rs-class)
* as an {@link ObservableInput} source.
*/
export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReaderLike<T>;
}
23 changes: 23 additions & 0 deletions src/internal/util/isReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { ReadableStreamLike } from '../types';
import { isFunction } from './isFunction';

export async function* readableStreamLikeToAsyncGenerator<T>(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking... that while this is definitely the coolest way. The lame way might be more efficient, something like:

export function readableStreamLikeToAsyncGenerator(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
  const reader = readableStream.getReader();
  
  return {
     [Symbol.asyncIterator]() { return this; },
     async next() { 
       return reader.read();
     },
     async throw(err) {
       reader.releaseLock();
       throw err;
     },
     async return() {
       reader.releaseLock();
       return { done: true };
     }
  }
}

But maybe I'm totally wrong. Just something I was thinking about. I guess if there was an error during the execution of next above, the lock wouldn't get released unless you had a try/catch in there, so maybe this isn't the best idea.

const reader = readableStream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
return;
}
yield value!;
}
} finally {
reader.releaseLock();
}
}

export function isReadableStreamLike<T>(obj: any): obj is ReadableStreamLike<T> {
// We don't want to use instanceof checks because they would return
// false for instances from another Realm, like an <iframe>.
return isFunction(obj?.getReader);
}
2 changes: 1 addition & 1 deletion src/internal/util/throwUnobservableError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export function createInvalidObservableTypeError(input: any) {
return new TypeError(
`You provided ${
input !== null && typeof input === 'object' ? 'an invalid object' : `'${input}'`
} where a stream was expected. You can provide an Observable, Promise, Array, AsyncIterable, or Iterable.`
} where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.`
);
}