Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Nov 27, 2020
1 parent 6593e79 commit b43d28f
Show file tree
Hide file tree
Showing 14 changed files with 546 additions and 161 deletions.
14 changes: 7 additions & 7 deletions api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ export declare function mergeWith<T, A extends readonly unknown[]>(...otherSourc
export declare function min<T>(comparer?: (x: T, y: T) => number): MonoTypeOperatorFunction<T>;

export declare function multicast<T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): UnaryFunction<Observable<T>, ConnectableObservable<ObservedValueOf<O>>>;
export declare function multicast<T>(subjectFactory: (this: Observable<T>) => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function multicast<T, O extends ObservableInput<any>>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
export declare function multicast<T, O extends ObservableInput<any>>(subject: Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
export declare function multicast<T>(subjectFactory: () => Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function multicast<T, O extends ObservableInput<any>>(subjectFactory: () => Subject<T>, selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;

export declare function observeOn<T>(scheduler: SchedulerLike, delay?: number): MonoTypeOperatorFunction<T>;

Expand All @@ -206,14 +206,14 @@ export declare function pluck<T>(...properties: string[]): OperatorFunction<T, u

export declare function publish<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function publish<T, O extends ObservableInput<any>>(selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;
export declare function publish<T>(selector: MonoTypeOperatorFunction<T>): MonoTypeOperatorFunction<T>;

export declare function publishBehavior<T>(value: T): UnaryFunction<Observable<T>, ConnectableObservable<T>>;
export declare function publishBehavior<T>(initialValue: T): UnaryFunction<Observable<T>, ConnectableObservable<T>>;

export declare function publishLast<T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>>;

export declare function publishReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
export declare function publishReplay<T, O extends ObservableInput<any>>(bufferSize?: number, windowTime?: number, selector?: (shared: Observable<T>) => O, scheduler?: SchedulerLike): OperatorFunction<T, ObservedValueOf<O>>;
export declare function publishReplay<T>(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider): MonoTypeOperatorFunction<T>;
export declare function publishReplay<T, O extends ObservableInput<any>>(bufferSize: number | undefined, windowTime: number | undefined, selector: OperatorFunction<T, ObservedValueOf<O>>, timestampProvider?: TimestampProvider): OperatorFunction<T, ObservedValueOf<O>>;
export declare function publishReplay<T, O extends ObservableInput<any>>(bufferSize: number | undefined, windowTime: number | undefined, selector: undefined, timestampProvider: TimestampProvider): OperatorFunction<T, ObservedValueOf<O>>;

export declare function race<T>(observables: Array<Observable<T>>): MonoTypeOperatorFunction<T>;
export declare function race<T, R>(observables: Array<Observable<T>>): OperatorFunction<T, R>;
Expand Down
148 changes: 148 additions & 0 deletions spec/deprecation-equivalents/multicasting-deprecations-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/** @prettier */
import { Observable, ConnectableObservable, connectable, of, AsyncSubject, BehaviorSubject, ReplaySubject, Subject, merge } from 'rxjs';
import { connect, share, multicast, publish, publishReplay, publishBehavior, publishLast, refCount, repeat, retry } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';

describe('multicasting equivalent tests', () => {
let rxTest: TestScheduler;

beforeEach(() => {
rxTest = new TestScheduler(observableMatcher);
});

testEquivalents(
'multicast(() => new Subject()), refCount() and share()',
(source) =>
source.pipe(
multicast(() => new Subject<string>()),
refCount()
),
(source) => source.pipe(share())
);

testEquivalents(
'multicast(new Subject()), refCount() and share({ resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })',
(source) => source.pipe(multicast(new Subject()), refCount()),
(source) => source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false }))
);

testEquivalents(
'publish(), refCount() and share({ resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })',
(source) => source.pipe(publish(), refCount()),
(source) => source.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false }))
);

testEquivalents(
'publishLast(), refCount() and share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })',
(source) => source.pipe(publishLast(), refCount()),
(source) =>
source.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false }))
);

testEquivalents(
'publishBehavior("X"), refCount() and share({ connector: () => new BehaviorSubject("X"), resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })',
(source) => source.pipe(publishBehavior('X'), refCount()),
(source) =>
source.pipe(
share({ connector: () => new BehaviorSubject('X'), resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })
)
);

testEquivalents(
'publishReplay(3, 10), refCount() and share({ connector: () => new ReplaySubject(3, 10), resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })',
(source) => source.pipe(publishReplay(3, 10), refCount()),
(source) =>
source.pipe(
share({ connector: () => new ReplaySubject(3, 10), resetOnError: false, resetOnComplete: false, resetOnUnsubscribe: false })
)
);

const fn = (source: Observable<any>) => merge(source, source);

testEquivalents(
'publish(fn) and connect({ setup: fn })',
(source) => source.pipe(publish(fn)),
(source) =>
source.pipe(
connect({
setup: fn,
})
)
);

testEquivalents(
'publishReplay(3, 10, fn) and `subject = new ReplaySubject(3, 10), connect({ connector: () => subject , setup: fn })`',
(source) => source.pipe(publishReplay(3, 10, fn)),
(source) => {
const subject = new ReplaySubject(3, 10);
return source.pipe(connect({ connector: () => subject, setup: fn }));
}
);

/**
* Used to test a variety of scenarios with multicast operators that should be equivalent.
* @param name The name to add to the test output
* @param oldExpression The old expression we're saying matches the updated expression
* @param updatedExpression The updated expression we're telling people to use instead.
*/
function testEquivalents(
name: string,
oldExpression: (source: Observable<string>) => Observable<string>,
updatedExpression: (source: Observable<string>) => Observable<string>
) {
it(`should be equivalent for ${name} for async sources`, () => {
rxTest.run(({ cold, expectObservable }) => {
const source = cold('----a---b---c----d---e----|');
const old = oldExpression(source);
const updated = updatedExpression(source);
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that repeat`, () => {
rxTest.run(({ cold, expectObservable }) => {
const source = cold('----a---b---c----d---e----|');
const old = oldExpression(source).pipe(repeat(3));
const updated = updatedExpression(source).pipe(repeat(3));
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that retry`, () => {
rxTest.run(({ cold, expectObservable }) => {
const source = cold('----a---b---c----d---e----#');
const old = oldExpression(source).pipe(retry(3));
const updated = updatedExpression(source).pipe(retry(3));
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources`, () => {
rxTest.run(({ expectObservable }) => {
const source = of('a', 'b', 'c');
const old = oldExpression(source);
const updated = updatedExpression(source);
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that repeat`, () => {
rxTest.run(({ expectObservable }) => {
const source = of('a', 'b', 'c');
const old = oldExpression(source).pipe(repeat(3));
const updated = updatedExpression(source).pipe(repeat(3));
expectObservable(updated).toEqual(old);
});
});

it(`should be equivalent for ${name} for async sources that retry`, () => {
rxTest.run(({ expectObservable }) => {
const source = of('a', 'b', 'c');
const old = oldExpression(source).pipe(retry(3));
const updated = updatedExpression(source).pipe(retry(3));
expectObservable(updated).toEqual(old);
});
});
}
});
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export { bindCallback } from './internal/observable/bindCallback';
export { bindNodeCallback } from './internal/observable/bindNodeCallback';
export { combineLatest } from './internal/observable/combineLatest';
export { concat } from './internal/observable/concat';
export { connectable } from './internal/observable/connectable';
export { defer } from './internal/observable/defer';
export { empty } from './internal/observable/empty';
export { forkJoin } from './internal/observable/forkJoin';
Expand Down
17 changes: 17 additions & 0 deletions src/internal/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,25 @@ import { OperatorSubscriber } from '../operators/OperatorSubscriber';

/**
* @class ConnectableObservable<T>
* @deprecated To be removed in version 8. Please use {@link connectable} to create a connectable observable.
* If you are using the `refCount` method of `ConnectableObservable` you can use the updated {@link share} operator
* instead, which is now highly configurable.
*/
export class ConnectableObservable<T> extends Observable<T> {
protected _subject: Subject<T> | null = null;
protected _refCount: number = 0;
protected _connection: Subscription | null = null;

/**
* @param source The source observable
* @param subjectFactory The factory that creates the subject used internally.
* @deprecated To be removed in version 8. Please use {@link connectable} to create a connectable observable.
* If you are using the `refCount` method of `ConnectableObservable` you can use the {@link share} operator
* instead, which is now highly configurable. `new ConnectableObservable(source, fn)` is equivalent
* to `connectable(source, fn)`. With the exception of when the `refCount()` method is needed, in which
* case, the new {@link share} operator should be used: `new ConnectableObservable(source, fn).refCount()`
* is equivalent to `source.pipe(share({ connector: fn }))`.
*/
constructor(public source: Observable<T>, protected subjectFactory: () => Subject<T>) {
super();
}
Expand Down Expand Up @@ -68,6 +81,10 @@ export class ConnectableObservable<T> extends Observable<T> {
return connection;
}

/**
* @deprecated The {@link ConnectableObservable} class is scheduled for removal in version 8.
* Please use the {@link share} operator, which is now highly configurable.
*/
refCount(): Observable<T> {
return higherOrderRefCount()(this) as Observable<T>;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ export type ConnectableObservableLike<T> = Observable<T> & {
* @returns A "connectable" observable, that has a `connect()` method, that you must call to
* connect the source to all consumers through the subject provided as the connector.
*/
export function makeConnectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> {
export function connectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> {
// The subscription representing the connection.
let connection: Subscription | null = null;

const result: any = new Observable<T>((subscriber) => {
return connector.subscribe(subscriber);
});

result.connect = function () {
// Define the `connect` function. This is what users must call
// in order to "connect" the source to the subject that is
// multicasting it.
result.connect = () => {
if (!connection) {
connection = defer(() => source).subscribe(connector);
}
Expand Down
120 changes: 93 additions & 27 deletions src/internal/operators/connect.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,110 @@
/** @prettier */
import { lift } from '../util/lift';
import { Subscriber } from '../Subscriber';
import { OperatorFunction, ObservableInput } from '../types';
import { Observable } from '../Observable';
import { Subject } from '../Subject';
import { from } from '../observable/from';
import { Subscription } from '../Subscription';
import { operate } from '../util/lift';

/**
* The default connector function used for `connect`.
* A factory function that will create a {@link Subject}.
*/
function defaultConnector<T>() {
return new Subject<T>();
}

/**
* Creates an observable by multicasting the source within a function that
* allows the developer to define the usage of the multicast prior to connection.
*
* This is particularly useful if the observable source you wish to multicast could
* be synchronous or asynchronous. This sets it apart from {@link share}, which, in the
* case of totally synchronous sources will fail to share a single subscription with
* multiple consumers, as by the time the subscription to the result of {@link share}
* has returned, if the source is synchronous its internal reference count will jump from
* 0 to 1 back to 0 and reset.
*
* To use `connect`, you provide a `setup` function via configuration that will give you
* a multicast observable that is not yet connected. You then use that multicast observable
* to create a resulting observable that, when subscribed, will set up your multicast. This is
* generally, but not always, accomplished with {@link merge}.
*
* Note that using a {@link takeUntil} inside of `connect`'s `setup` _might_ mean you were looking
* to use the {@link takeWhile} operator instead.
*
* When you subscribe to the result of `connect`, the `setup` function will be called. After
* the `setup` function returns, the observable it returns will be subscribed to, _then_ the
* multicast will be connected to the source.
*
* ### Example
*
* Sharing a totally synchronous observable
*
* ```ts
* import { defer, of } from 'rxjs';
* import { tap, connect } from 'rxjs/operators';
*
* const source$ = defer(() => {
* console.log('subscription started');
* return of(1, 2, 3, 4, 5).pipe(
* tap(n => console.log(`source emitted ${n}`))
* );
* });
*
* source$.pipe(
* connect({
* // Notice in here we're merging three subscriptions to `shared$`.
* setup: (shared$) => merge(
* shared$.pipe(map(n => `all ${n}`)),
* shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${n}`)),
* shared$.pipe(filter(n => n % 2 === 1), map(n => `odd ${n}`)),
* )
* })
* )
* .subscribe(console.log);
*
* // Expected output: (notice only one subscription)
* "subscription started"
* "source emitted 1"
* "all 1"
* "odd 1"
* "source emitted 2"
* "all 2"
* "even 2"
* "source emitted 3"
* "all 3"
* "odd 3"
* "source emitted 4"
* "all 4"
* "even 4"
* "source emitted 5"
* "all 5"
* "odd 5"
* ```
*
* @param param0 The configuration object for `connect`.
*/
export function connect<T, R>({
connector = defaultConnector,
setup,
}: {
/**
* A factory function used to create the Subject through which the source
* is multicast. By default this creates a {@link Subject}.
*/
connector?: () => Subject<T>;
/**
* A function used to set up the multicast. Gives you a multicast observable
* that is not yet connected. With that, you're expected to create and return
* and Observable, that when subscribed to, will utilize the multicast observable.
* After this function is executed -- and its return value subscribed to -- the
* the operator will subscribe to the source, and the connection will be made.
*/
setup: (shared: Observable<T>) => ObservableInput<R>;
}): OperatorFunction<T, R> {
return (source: Observable<T>) => {
return lift(source, function (this: Subscriber<R>, source: Observable<T>) {
const subscriber = this;
let subject: Subject<T>;
try {
subject = connector();
} catch (err) {
subscriber.error(err);
return;
}

let result: Observable<R>;
try {
result = from(setup(subject.asObservable()));
} catch (err) {
subscriber.error(err);
return;
}

const subscription = result.subscribe(subscriber);
subscription.add(source.subscribe(subject));
return subscription;
});
};
return operate((source, subscriber) => {
const subject = connector();
from(setup(subject.asObservable())).subscribe(subscriber);
subscriber.add(source.subscribe(subject));
});
}
Loading

0 comments on commit b43d28f

Please sign in to comment.