Skip to content

Commit

Permalink
chore: Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Dec 14, 2020
1 parent 9a45218 commit 8c13e0a
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 86 deletions.
7 changes: 4 additions & 3 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ export declare class Observable<T> implements Subscribable<T> {
pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>;
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>;
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>, ...operations: OperatorFunction<any, any>[]): Observable<unknown>;
subscribe(observer?: PartialObserver<T>): Subscription;
subscribe(observer?: Partial<Observer<T>>): Subscription;
subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription;
subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Subscription;
subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Subscription;
Expand Down Expand Up @@ -443,10 +443,11 @@ export declare class Subject<T> extends Observable<T> implements SubscriptionLik
static create: (...args: any[]) => any;
}

export declare type SubjectLike<T> = Observer<T> & Subscribable<T>;
export interface SubjectLike<T> extends Observer<T>, Subscribable<T> {
}

export interface Subscribable<T> {
subscribe(observer: Observer<T>): Unsubscribable;
subscribe(observer: Partial<Observer<T>>): Unsubscribable;
}

export declare type SubscribableOrPromise<T> = Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;
Expand Down
7 changes: 2 additions & 5 deletions api_guard/dist/types/operators/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ export declare function concatMapTo<T, R, O extends ObservableInput<any>>(observ

export declare function concatWith<T, A extends readonly unknown[]>(...otherSources: [...ObservableInputTuple<A>]): OperatorFunction<T, T | A[number]>;

export declare function connect<T, R>({ connector, setup, }: {
connector?: () => SubjectLike<T>;
setup: (shared: Observable<T>) => ObservableInput<R>;
}): OperatorFunction<T, R>;
export declare function connect<T, R>(selector: (shared: Observable<T>) => ObservableInput<R>, config?: ConnectConfig<T>): OperatorFunction<T, R>;

export declare function count<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, number>;

Expand Down Expand Up @@ -253,7 +250,7 @@ export declare function scan<V, A, S>(accumulator: (acc: A | S, value: V, index:
export declare function sequenceEqual<T>(compareTo: Observable<T>, comparator?: (a: T, b: T) => boolean): OperatorFunction<T, boolean>;

export declare function share<T>(): MonoTypeOperatorFunction<T>;
export declare function share<T, R = T>(options: ShareOptions<T, R>): OperatorFunction<T, R>;
export declare function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;

export declare function shareReplay<T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>;
export declare function shareReplay<T>(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,15 @@ describe('multicasting equivalent tests', () => {
testEquivalents(
'publish(fn) and connect({ setup: fn })',
(source) => source.pipe(publish(fn)),
(source) =>
source.pipe(
connect({
setup: fn,
})
)
(source) => source.pipe(connect(fn))
);

testEquivalents(
'publishReplay(3, 10, fn) and `subject = new ReplaySubject(3, 10), connect({ connector: () => subject , setup: fn })`',
(source) => source.pipe(publishReplay(3, 10, fn)),
(source) => {
const subject = new ReplaySubject(3, 10);
return source.pipe(connect({ connector: () => subject, setup: fn }));
return source.pipe(connect(fn, { connector: () => subject }));
}
);

Expand Down
22 changes: 9 additions & 13 deletions spec/operators/connect-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,33 @@ describe('connect', () => {
rxTest = new TestScheduler(observableMatcher);
});

it('should connect a source through a setup function', () => {
it('should connect a source through a selector function', () => {
rxTest.run(({ cold, time, expectObservable }) => {
const source = cold('---a----b-----c---|');
const d = time(' ---|');
const expected = ' ---a--a-b--b--c--c|';

const result = source.pipe(
connect({
setup: (shared) => {
return merge(shared.pipe(delay(d)), shared);
},
})
);
const result = source.pipe(connect((shared) => merge(shared.pipe(delay(d)), shared)));

expectObservable(result).toBe(expected);
});
});

it('should connect a source through a setup function and use the provided connector', () => {
it('should connect a source through a selector function and use the provided connector', () => {
rxTest.run(({ cold, time, expectObservable }) => {
const source = cold('--------a---------b---------c-----|');
const d = time(' ---|');
const expected = ' S--S----a--a------b--b------c--c--|';

const result = source.pipe(
connect({
connector: () => new BehaviorSubject('S'),
setup: (shared) => {
connect(
(shared) => {
return merge(shared.pipe(delay(d)), shared);
},
})
{
connector: () => new BehaviorSubject('S'),
}
)
);

expectObservable(result).toBe(expected);
Expand Down
5 changes: 4 additions & 1 deletion spec/schedulers/TestScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,16 @@ describe('TestScheduler', () => {
expect(expectObservable).to.be.a('function');
expect(expectSubscriptions).to.be.a('function');

const obs1 = cold('-a-c-e|');
const obs1 = cold('-a-c-e|');
const obs2 = hot(' ^-b-d-f|');
const output = merge(obs1, obs2);
const expected = ' -abcdef|';

expectObservable(output).toBe(expected);
expectObservable(output).toEqual(cold(expected));
// There are two subscriptions to each of these, because we merged
// them together, then we subscribed to the merged result once
// to check `toBe` and another time to check `toEqual`.
expectSubscriptions(obs1.subscriptions).toBe(['^-----!', '^-----!']);
expectSubscriptions(obs2.subscriptions).toBe(['^------!', '^------!']);
});
Expand Down
6 changes: 3 additions & 3 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import { Operator } from './Operator';
import { SafeSubscriber, Subscriber } from './Subscriber';
import { isSubscription, Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable, Observer } from './types';
import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
Expand Down Expand Up @@ -68,7 +68,7 @@ export class Observable<T> implements Subscribable<T> {
return observable;
}

subscribe(observer?: PartialObserver<T>): Subscription;
subscribe(observer?: Partial<Observer<T>>): Subscription;
/** @deprecated Use an observer instead of a complete callback */
subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription;
/** @deprecated Use an observer instead of an error callback */
Expand Down Expand Up @@ -202,7 +202,7 @@ export class Observable<T> implements Subscribable<T> {
* @method subscribe
*/
subscribe(
observerOrNext?: PartialObserver<T> | ((value: T) => void) | null,
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { isFunction } from './util/isFunction';
import { Observer, PartialObserver, ObservableNotification } from './types';
import { Observer, ObservableNotification } from './types';
import { isSubscription, Subscription } from './Subscription';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
Expand Down Expand Up @@ -126,7 +126,7 @@ export class Subscriber<T> extends Subscription implements Observer<T> {

export class SafeSubscriber<T> extends Subscriber<T> {
constructor(
observerOrNext?: PartialObserver<T> | ((value: T) => void) | null,
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((e?: any) => void) | null,
complete?: (() => void) | null
) {
Expand Down
8 changes: 6 additions & 2 deletions src/internal/observable/connectable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ import { Subscription } from '../Subscription';
import { Observable } from '../Observable';
import { defer } from './defer';

export type ConnectableObservableLike<T> = Observable<T> & {
/**
* An observable with a `connect` method that is used to create a subscription
* to an underlying source, connecting it with all consumers via a multicast.
*/
export interface ConnectableObservableLike<T> extends Observable<T> {
/**
* (Idempotent) Calling this method will connect the underlying source observable to all subscribed consumers
* through an underlying {@link Subject}.
* @returns A subscription, that when unsubscribed, will "disconnect" the source from the connector subject,
* severing notifications to all consumers.
*/
connect(): Subscription;
};
}

/**
* Creates an observable that multicasts once `connect()` is called on it.
Expand Down
63 changes: 30 additions & 33 deletions src/internal/operators/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ import { from } from '../observable/from';
import { operate } from '../util/lift';
import { fromSubscribable } from '../observable/fromSubscribable';

export interface ConnectConfig<T> {
/**
* A factory function used to create the Subject through which the source
* is multicast. By default this creates a {@link Subject}.
*/
connector: () => SubjectLike<T>;
}

/**
* The default connector function used for `connect`.
* A factory function that will create a {@link Subject}.
* The default configuration for `connect`.
*/
function defaultConnector<T>() {
return new Subject<T>();
}
const DEFAULT_CONFIG: ConnectConfig<unknown> = {
connector: () => new Subject<unknown>(),
};

/**
* Creates an observable by multicasting the source within a function that
Expand All @@ -25,16 +32,16 @@ function defaultConnector<T>() {
* has returned, if the source is synchronous its internal reference count will jump from
* 0 to 1 back to 0 and reset.
*
* To use `connect`, you provide a `setup` function via configuration that will give you
* To use `connect`, you provide a `selector` function that will give you
* a multicast observable that is not yet connected. You then use that multicast observable
* to create a resulting observable that, when subscribed, will set up your multicast. This is
* generally, but not always, accomplished with {@link merge}.
*
* Note that using a {@link takeUntil} inside of `connect`'s `setup` _might_ mean you were looking
* Note that using a {@link takeUntil} inside of `connect`'s `selector` _might_ mean you were looking
* to use the {@link takeWhile} operator instead.
*
* When you subscribe to the result of `connect`, the `setup` function will be called. After
* the `setup` function returns, the observable it returns will be subscribed to, _then_ the
* When you subscribe to the result of `connect`, the `selector` function will be called. After
* the `selector` function returns, the observable it returns will be subscribed to, _then_ the
* multicast will be connected to the source.
*
* ### Example
Expand All @@ -53,14 +60,12 @@ function defaultConnector<T>() {
* });
*
* source$.pipe(
* connect({
* // Notice in here we're merging three subscriptions to `shared$`.
* setup: (shared$) => merge(
* // Notice in here we're merging 3 subscriptions to `shared$`.
* connect((shared$) => merge(
* shared$.pipe(map(n => `all ${n}`)),
* shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${n}`)),
* shared$.pipe(filter(n => n % 2 === 1), map(n => `odd ${n}`)),
* )
* })
* ))
* )
* .subscribe(console.log);
*
Expand All @@ -83,29 +88,21 @@ function defaultConnector<T>() {
* "odd 5"
* ```
*
* @param selector A function used to set up the multicast. Gives you a multicast observable
* that is not yet connected. With that, you're expected to create and return
* and Observable, that when subscribed to, will utilize the multicast observable.
* After this function is executed -- and its return value subscribed to -- the
* the operator will subscribe to the source, and the connection will be made.
* @param param0 The configuration object for `connect`.
*/
export function connect<T, R>({
connector = defaultConnector,
setup,
}: {
/**
* A factory function used to create the Subject through which the source
* is multicast. By default this creates a {@link Subject}.
*/
connector?: () => SubjectLike<T>;
/**
* A function used to set up the multicast. Gives you a multicast observable
* that is not yet connected. With that, you're expected to create and return
* and Observable, that when subscribed to, will utilize the multicast observable.
* After this function is executed -- and its return value subscribed to -- the
* the operator will subscribe to the source, and the connection will be made.
*/
setup: (shared: Observable<T>) => ObservableInput<R>;
}): OperatorFunction<T, R> {
export function connect<T, R>(
selector: (shared: Observable<T>) => ObservableInput<R>,
config: ConnectConfig<T> = DEFAULT_CONFIG
): OperatorFunction<T, R> {
const { connector } = config;
return operate((source, subscriber) => {
const subject = connector();
from(setup(fromSubscribable(subject))).subscribe(subscriber);
from(selector(fromSubscribable(subject))).subscribe(subscriber);
subscriber.add(source.subscribe(subject));
});
}
7 changes: 3 additions & 4 deletions src/internal/operators/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ export function multicast<T>(subjectFactory: () => Subject<T>): UnaryFunction<Ob
* {@link connect} operator.
*
* @param subjectFactory A factory that creates the subject used to multicast.
* @param selector A setup function to setup the multicast
* @param selector A function to setup the multicast and select the output.
* @deprecated To be removed in version 8. Please use the new {@link connect} operator.
* `multicast(fn1, fn2)` is equivalent to `connect({ connector: fn1, setup: fn2 })`.
* `multicast(subjectFactor, selector)` is equivalent to `connect(selector, { connector: subjectFactory })`.
*/
export function multicast<T, O extends ObservableInput<any>>(
subjectFactory: () => Subject<T>,
Expand All @@ -77,9 +77,8 @@ export function multicast<T, R>(
// If a selector function is provided, then we're a "normal" operator that isn't
// going to return a ConnectableObservable. We can use `connect` to do what we
// need to do.
return connect({
return connect(selector, {
connector: subjectFactory,
setup: selector,
});
}

Expand Down
9 changes: 2 additions & 7 deletions src/internal/operators/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export function publish<T>(): UnaryFunction<Observable<T>, ConnectableObservable
* @param selector A function used to setup multicasting prior to automatic connection.
*
* @deprecated To be removed in version 8. Use the new {@link connect} operator.
* If you're using `publish(fn)`, it is equivalent to `connect({ setup: fn })`.
* If you're using `publish(fn)`, it is equivalent to `connect(fn)`.
*/
export function publish<T, O extends ObservableInput<any>>(selector: (shared: Observable<T>) => O): OperatorFunction<T, ObservedValueOf<O>>;

Expand Down Expand Up @@ -79,10 +79,5 @@ export function publish<T, O extends ObservableInput<any>>(selector: (shared: Ob
* @return A ConnectableObservable that upon connection causes the source Observable to emit items to its Observers.
*/
export function publish<T, R>(selector?: OperatorFunction<T, R>): MonoTypeOperatorFunction<T> | OperatorFunction<T, R> {
return selector
? connect({
connector: () => new Subject<T>(),
setup: selector,
})
: multicast(new Subject<T>());
return selector ? connect(selector) : multicast(new Subject<T>());
}
2 changes: 1 addition & 1 deletion src/internal/operators/publishReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export function publishReplay<T>(
* @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}.
* @deprecated To be removed in version 8. Use the new {@link connect} operator.
* `source.pipe(publishReplay(size, window, fn, scheduler))` is equivalent to
* `const subject = new ReplaySubject(size, window, scheduler), connect({ connector: () => subject, setup: fn })`.
* `const subject = new ReplaySubject(size, window, scheduler), source.pipe(connect(fn, { connector: () => subject }))`.
*/
export function publishReplay<T, O extends ObservableInput<any>>(
bufferSize: number | undefined,
Expand Down
6 changes: 3 additions & 3 deletions src/internal/operators/share.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { Subscription } from '../Subscription';
import { from } from '../observable/from';
import { operate } from '../util/lift';

interface ShareOptions<T, R> {
export interface ShareConfig<T> {
/**
* The factory used to create the subject that will connect the source observable to
* multicast consumers.
Expand Down Expand Up @@ -43,7 +43,7 @@ interface ShareOptions<T, R> {

export function share<T>(): MonoTypeOperatorFunction<T>;

export function share<T, R = T>(options: ShareOptions<T, R>): OperatorFunction<T, R>;
export function share<T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;

/**
* Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one
Expand Down Expand Up @@ -87,7 +87,7 @@ export function share<T, R = T>(options: ShareOptions<T, R>): OperatorFunction<T
* // ... and so on
* ```
*/
export function share<T, R>(options?: ShareOptions<T, R>): OperatorFunction<T, T | R> {
export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
options = options || {};
const { connector = () => new Subject<T>(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options;

Expand Down
Loading

0 comments on commit 8c13e0a

Please sign in to comment.