Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ReadableStreams): Support for ReadableStreams e.g. from(readableStream) #6163

Merged
merged 5 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ export declare class Observable<T> implements Subscribable<T> {
static create: (...args: any[]) => any;
}

export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export declare type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T> | ReadableStreamLike<T>;

export declare type ObservableInputTuple<T> = {
[K in keyof T]: ObservableInput<T[K]>;
Expand Down
20 changes: 18 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@
"typedoc": "^0.17.8",
"typescript": "~4.2.2",
"validate-commit-msg": "2.14.0",
"web-streams-polyfill": "^3.0.2",
"webpack": "^4.31.0"
},
"files": [
Expand Down
97 changes: 82 additions & 15 deletions spec/observables/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { expect } from 'chai';
import { TestScheduler } from 'rxjs/testing';
import { asyncScheduler, of, from, Observer, observable, Subject, noop } from 'rxjs';
import { first, concatMap, delay, take, map, tap } from 'rxjs/operators';
import { first, concatMap, delay, take, tap } from 'rxjs/operators';
import { ReadableStream } from 'web-streams-polyfill';

// tslint:disable:no-any
declare const expectObservable: any;
Expand Down Expand Up @@ -108,7 +109,7 @@ describe('from', () => {
});
});

benlesh marked this conversation as resolved.
Show resolved Hide resolved

it('should finalize a generator', () => {
const results: any[] = [];

Expand Down Expand Up @@ -175,16 +176,24 @@ describe('from', () => {
});

// tslint:disable-next-line:no-any it's silly to define all of these types.
const sources: Array<{ name: string, value: any }> = [
{ name: 'observable', value: of('x') },
{ name: 'observable-like', value: fakervable('x') },
{ name: 'observable-like-array', value: fakeArrayObservable('x') },
{ name: 'array', value: ['x'] },
{ name: 'promise', value: Promise.resolve('x') },
{ name: 'iterator', value: fakerator('x') },
{ name: 'array-like', value: { [0]: 'x', length: 1 }},
{ name: 'string', value: 'x'},
{ name: 'arguments', value: getArguments('x') },
const sources: Array<{ name: string, createValue: () => any }> = [
{ name: 'observable', createValue: () => of('x') },
{ name: 'observable-like', createValue: () => fakervable('x') },
{ name: 'observable-like-array', createValue: () => fakeArrayObservable('x') },
{ name: 'array', createValue: () => ['x'] },
{ name: 'promise', createValue: () => Promise.resolve('x') },
{ name: 'iterator', createValue: () => fakerator('x') },
{ name: 'array-like', createValue: () => ({ [0]: 'x', length: 1 }) },
// ReadableStreams are not lazy, so we have to have this createValue() thunk
// so that each tests gets a new one.
{ name: 'readable-stream-like', createValue: () => new ReadableStream({
pull(controller) {
controller.enqueue('x');
controller.close();
},
})},
{ name: 'string', createValue: () => 'x'},
{ name: 'arguments', createValue: () => getArguments('x') },
];

if (Symbol && Symbol.asyncIterator) {
Expand All @@ -211,14 +220,14 @@ describe('from', () => {

sources.push({
name: 'async-iterator',
value: fakeAsyncIterator('x')
createValue: () => fakeAsyncIterator('x')
});
}

for (const source of sources) {
it(`should accept ${source.name}`, (done) => {
let nextInvoked = false;
from(source.value)
from(source.createValue())
.subscribe(
(x) => {
nextInvoked = true;
Expand All @@ -235,7 +244,7 @@ describe('from', () => {
});
it(`should accept ${source.name} and scheduler`, (done) => {
let nextInvoked = false;
from(source.value, asyncScheduler)
from(source.createValue(), asyncScheduler)
.subscribe(
(x) => {
nextInvoked = true;
Expand Down Expand Up @@ -321,4 +330,62 @@ describe('from', () => {

expect(finallyExecuted).to.be.true;
});

it('should support ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});

it('should lock and release ReadableStream-like objects', (done) => {
const input = [0, 1, 2];
const output: number[] = [];

const readableStream = new ReadableStream({
pull(controller) {
if (input.length > 0) {
controller.enqueue(input.shift());

if (input.length === 0) {
controller.close();
}
}
},
});

from(readableStream).subscribe({
next: value => {
output.push(value);
expect(readableStream.locked).to.equal(true);
},
complete: () => {
expect(output).to.deep.equal([0, 1, 2]);
expect(readableStream.locked).to.equal(false);
done();
}
});
});
});
8 changes: 8 additions & 0 deletions src/internal/observable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isIterable } from '../util/isIterable';
import { isReadableStreamLike, ReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function from<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
/** @deprecated The scheduler argument is deprecated, use scheduled. Details: https://rxjs.dev/deprecations/scheduler-argument */
Expand Down Expand Up @@ -141,6 +142,9 @@ export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (isIterable(input)) {
return fromIterable(input);
}
if (isReadableStreamLike(input)) {
return fromReadableStreamLike(input);
}
}

throw createInvalidObservableTypeError(input);
Expand Down Expand Up @@ -220,6 +224,10 @@ function fromAsyncIterable<T>(asyncIterable: AsyncIterable<T>) {
});
}

function fromReadableStreamLike<T>(readableStream: ReadableStreamLike<T>) {
return fromAsyncIterable(readableStreamLikeToAsyncGenerator(readableStream));
}

async function process<T>(asyncIterable: AsyncIterable<T>, subscriber: Subscriber<T>) {
for await (const value of asyncIterable) {
subscriber.next(value);
Expand Down
8 changes: 8 additions & 0 deletions src/internal/scheduled/scheduleReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { ReadableStreamLike, readableStreamLikeToAsyncGenerator } from '../util/isReadableStreamLike';

export function scheduleReadableStreamLike<T>(input: ReadableStreamLike<T>, scheduler: SchedulerLike): Observable<T> {
return scheduleAsyncIterable(readableStreamLikeToAsyncGenerator(input), scheduler);
}
7 changes: 6 additions & 1 deletion src/internal/scheduled/scheduled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ import { scheduleObservable } from './scheduleObservable';
import { schedulePromise } from './schedulePromise';
import { scheduleArray } from './scheduleArray';
import { scheduleIterable } from './scheduleIterable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isInteropObservable } from '../util/isInteropObservable';
import { isPromise } from '../util/isPromise';
import { isArrayLike } from '../util/isArrayLike';
import { isIterable } from '../util/isIterable';
import { ObservableInput, SchedulerLike } from '../types';
import { Observable } from '../Observable';
import { scheduleAsyncIterable } from './scheduleAsyncIterable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
import { isReadableStreamLike } from '../util/isReadableStreamLike';
import { scheduleReadableStreamLike } from './scheduleReadableStreamLike';

/**
* Converts from a common {@link ObservableInput} type to an observable where subscription and emissions
Expand Down Expand Up @@ -40,6 +42,9 @@ export function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike
if (isIterable(input)) {
return scheduleIterable(input, scheduler);
}
if (isReadableStreamLike(input)) {
return scheduleReadableStreamLike(input, scheduler);
}
}
throw createInvalidObservableTypeError(input);
}
10 changes: 9 additions & 1 deletion src/internal/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import { Observable } from './Observable';
import { Subscription } from './Subscription';
import { ReadableStreamLike } from './util/isReadableStreamLike';

/**
* NOTE: This will add Symbol.observable globally for all TypeScript users,
Expand Down Expand Up @@ -91,7 +92,14 @@ export interface Subscribable<T> {
/**
* Valid types that can be converted to observables.
*/
export type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
export type ObservableInput<T> =
| Observable<T>
| InteropObservable<T>
| AsyncIterable<T>
| PromiseLike<T>
| ArrayLike<T>
| Iterable<T>
| ReadableStreamLike<T>;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to change this and the other places to have a consistent order, if there's a preference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the order of this should match the runtime order.


/** @deprecated use {@link InteropObservable } */
export type ObservableLike<T> = InteropObservable<T>;
Expand Down
24 changes: 24 additions & 0 deletions src/internal/util/isReadableStreamLike.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { isFunction } from './isFunction';

export interface ReadableStreamLike<T> {
getReader(): ReadableStreamDefaultReader<T>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give this a proper review later, but I don't think we can rely on - or explicitly reference using a /// comment - this type: ReadableStreamDefaultReader<T>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah great catch. I'll stub it out too since it's easy.

Copy link
Member Author

@jayphelps jayphelps Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. a7e9b56

}

export async function* readableStreamLikeToAsyncGenerator<T>(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking... that while this is definitely the coolest way. The lame way might be more efficient, something like:

export function readableStreamLikeToAsyncGenerator(readableStream: ReadableStreamLike<T>): AsyncGenerator<T> {
  const reader = readableStream.getReader();
  
  return {
     [Symbol.asyncIterator]() { return this; },
     async next() { 
       return reader.read();
     },
     async throw(err) {
       reader.releaseLock();
       throw err;
     },
     async return() {
       reader.releaseLock();
       return { done: true };
     }
  }
}

But maybe I'm totally wrong. Just something I was thinking about. I guess if there was an error during the execution of next above, the lock wouldn't get released unless you had a try/catch in there, so maybe this isn't the best idea.

const reader = readableStream.getReader();
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
return;
}
yield value!;
}
} finally {
reader.releaseLock();
}
}

export function isReadableStreamLike<T>(obj: any): obj is ReadableStreamLike<T> {
return isFunction(obj?.getReader);
}
2 changes: 1 addition & 1 deletion src/internal/util/throwUnobservableError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ export function createInvalidObservableTypeError(input: any) {
return new TypeError(
`You provided ${
input !== null && typeof input === 'object' ? 'an invalid object' : `'${input}'`
} where a stream was expected. You can provide an Observable, Promise, Array, AsyncIterable, or Iterable.`
} where a stream was expected. You can provide an Observable, Promise, ReadableStream, Array, AsyncIterable, or Iterable.`
);
}