diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 833f365b89..18585e7b12 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -301,7 +301,7 @@ export declare class Observable implements Subscribable { pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction): Observable; pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction): Observable; pipe(op1: OperatorFunction, op2: OperatorFunction, op3: OperatorFunction, op4: OperatorFunction, op5: OperatorFunction, op6: OperatorFunction, op7: OperatorFunction, op8: OperatorFunction, op9: OperatorFunction, ...operations: OperatorFunction[]): Observable; - subscribe(observer?: PartialObserver): Subscription; + subscribe(observer?: Partial>): Subscription; subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; subscribe(next: null | undefined, error: (error: any) => void, complete?: () => void): Subscription; subscribe(next: (value: T) => void, error: null | undefined, complete: () => void): Subscription; @@ -443,10 +443,11 @@ export declare class Subject extends Observable implements SubscriptionLik static create: (...args: any[]) => any; } -export declare type SubjectLike = Observer & Subscribable; +export interface SubjectLike extends Observer, Subscribable { +} export interface Subscribable { - subscribe(observer: Observer): Unsubscribable; + subscribe(observer: Partial>): Unsubscribable; } export declare type SubscribableOrPromise = Subscribable | Subscribable | PromiseLike | InteropObservable; diff --git a/api_guard/dist/types/operators/index.d.ts b/api_guard/dist/types/operators/index.d.ts index 24e43da16b..3a85ca4d31 100644 --- a/api_guard/dist/types/operators/index.d.ts +++ b/api_guard/dist/types/operators/index.d.ts @@ -60,10 +60,7 @@ export declare function concatMapTo>(observ export declare function concatWith(...otherSources: [...ObservableInputTuple]): OperatorFunction; -export declare function connect({ connector, setup, }: { - connector?: () => SubjectLike; - setup: (shared: Observable) => ObservableInput; -}): OperatorFunction; +export declare function connect(selector: (shared: Observable) => ObservableInput, config?: ConnectConfig): OperatorFunction; export declare function count(predicate?: (value: T, index: number) => boolean): OperatorFunction; @@ -253,7 +250,7 @@ export declare function scan(accumulator: (acc: A | S, value: V, index: export declare function sequenceEqual(compareTo: Observable, comparator?: (a: T, b: T) => boolean): OperatorFunction; export declare function share(): MonoTypeOperatorFunction; -export declare function share(options: ShareOptions): OperatorFunction; +export declare function share(options: ShareConfig): MonoTypeOperatorFunction; export declare function shareReplay(config: ShareReplayConfig): MonoTypeOperatorFunction; export declare function shareReplay(bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction; diff --git a/spec/deprecation-equivalents/multicasting-deprecations-spec.ts b/spec/deprecation-equivalents/multicasting-deprecations-spec.ts index 26feef216c..d9024c44fc 100644 --- a/spec/deprecation-equivalents/multicasting-deprecations-spec.ts +++ b/spec/deprecation-equivalents/multicasting-deprecations-spec.ts @@ -63,12 +63,7 @@ describe('multicasting equivalent tests', () => { testEquivalents( 'publish(fn) and connect({ setup: fn })', (source) => source.pipe(publish(fn)), - (source) => - source.pipe( - connect({ - setup: fn, - }) - ) + (source) => source.pipe(connect(fn)) ); testEquivalents( @@ -76,7 +71,7 @@ describe('multicasting equivalent tests', () => { (source) => source.pipe(publishReplay(3, 10, fn)), (source) => { const subject = new ReplaySubject(3, 10); - return source.pipe(connect({ connector: () => subject, setup: fn })); + return source.pipe(connect(fn, { connector: () => subject })); } ); diff --git a/spec/operators/connect-spec.ts b/spec/operators/connect-spec.ts index a4cf50f5f8..23761d54e9 100644 --- a/spec/operators/connect-spec.ts +++ b/spec/operators/connect-spec.ts @@ -11,37 +11,33 @@ describe('connect', () => { rxTest = new TestScheduler(observableMatcher); }); - it('should connect a source through a setup function', () => { + it('should connect a source through a selector function', () => { rxTest.run(({ cold, time, expectObservable }) => { const source = cold('---a----b-----c---|'); const d = time(' ---|'); const expected = ' ---a--a-b--b--c--c|'; - const result = source.pipe( - connect({ - setup: (shared) => { - return merge(shared.pipe(delay(d)), shared); - }, - }) - ); + const result = source.pipe(connect((shared) => merge(shared.pipe(delay(d)), shared))); expectObservable(result).toBe(expected); }); }); - it('should connect a source through a setup function and use the provided connector', () => { + it('should connect a source through a selector function and use the provided connector', () => { rxTest.run(({ cold, time, expectObservable }) => { const source = cold('--------a---------b---------c-----|'); const d = time(' ---|'); const expected = ' S--S----a--a------b--b------c--c--|'; const result = source.pipe( - connect({ - connector: () => new BehaviorSubject('S'), - setup: (shared) => { + connect( + (shared) => { return merge(shared.pipe(delay(d)), shared); }, - }) + { + connector: () => new BehaviorSubject('S'), + } + ) ); expectObservable(result).toBe(expected); diff --git a/spec/schedulers/TestScheduler-spec.ts b/spec/schedulers/TestScheduler-spec.ts index 7d91068bbc..ce3b515cef 100644 --- a/spec/schedulers/TestScheduler-spec.ts +++ b/spec/schedulers/TestScheduler-spec.ts @@ -397,13 +397,16 @@ describe('TestScheduler', () => { expect(expectObservable).to.be.a('function'); expect(expectSubscriptions).to.be.a('function'); - const obs1 = cold('-a-c-e|'); + const obs1 = cold('-a-c-e|'); const obs2 = hot(' ^-b-d-f|'); const output = merge(obs1, obs2); const expected = ' -abcdef|'; expectObservable(output).toBe(expected); expectObservable(output).toEqual(cold(expected)); + // There are two subscriptions to each of these, because we merged + // them together, then we subscribed to the merged result once + // to check `toBe` and another time to check `toEqual`. expectSubscriptions(obs1.subscriptions).toBe(['^-----!', '^-----!']); expectSubscriptions(obs2.subscriptions).toBe(['^------!', '^------!']); }); diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index ea64715de8..c78bd9e90c 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -4,7 +4,7 @@ import { Operator } from './Operator'; import { SafeSubscriber, Subscriber } from './Subscriber'; import { isSubscription, Subscription } from './Subscription'; -import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable, Observer } from './types'; +import { TeardownLogic, OperatorFunction, Subscribable, Observer } from './types'; import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; @@ -68,7 +68,7 @@ export class Observable implements Subscribable { return observable; } - subscribe(observer?: PartialObserver): Subscription; + subscribe(observer?: Partial>): Subscription; /** @deprecated Use an observer instead of a complete callback */ subscribe(next: null | undefined, error: null | undefined, complete: () => void): Subscription; /** @deprecated Use an observer instead of an error callback */ @@ -202,7 +202,7 @@ export class Observable implements Subscribable { * @method subscribe */ subscribe( - observerOrNext?: PartialObserver | ((value: T) => void) | null, + observerOrNext?: Partial> | ((value: T) => void) | null, error?: ((error: any) => void) | null, complete?: (() => void) | null ): Subscription { diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index ab16054b3e..f5a4555c40 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -1,6 +1,6 @@ /** @prettier */ import { isFunction } from './util/isFunction'; -import { Observer, PartialObserver, ObservableNotification } from './types'; +import { Observer, ObservableNotification } from './types'; import { isSubscription, Subscription } from './Subscription'; import { config } from './config'; import { reportUnhandledError } from './util/reportUnhandledError'; @@ -126,7 +126,7 @@ export class Subscriber extends Subscription implements Observer { export class SafeSubscriber extends Subscriber { constructor( - observerOrNext?: PartialObserver | ((value: T) => void) | null, + observerOrNext?: Partial> | ((value: T) => void) | null, error?: ((e?: any) => void) | null, complete?: (() => void) | null ) { diff --git a/src/internal/observable/connectable.ts b/src/internal/observable/connectable.ts index d2ccdf9c11..df7aa81a7c 100644 --- a/src/internal/observable/connectable.ts +++ b/src/internal/observable/connectable.ts @@ -6,7 +6,11 @@ import { Subscription } from '../Subscription'; import { Observable } from '../Observable'; import { defer } from './defer'; -export type ConnectableObservableLike = Observable & { +/** + * An observable with a `connect` method that is used to create a subscription + * to an underlying source, connecting it with all consumers via a multicast. + */ +export interface ConnectableObservableLike extends Observable { /** * (Idempotent) Calling this method will connect the underlying source observable to all subscribed consumers * through an underlying {@link Subject}. @@ -14,7 +18,7 @@ export type ConnectableObservableLike = Observable & { * severing notifications to all consumers. */ connect(): Subscription; -}; +} /** * Creates an observable that multicasts once `connect()` is called on it. diff --git a/src/internal/operators/connect.ts b/src/internal/operators/connect.ts index 1379214dcd..7e7357fbdc 100644 --- a/src/internal/operators/connect.ts +++ b/src/internal/operators/connect.ts @@ -6,13 +6,20 @@ import { from } from '../observable/from'; import { operate } from '../util/lift'; import { fromSubscribable } from '../observable/fromSubscribable'; +export interface ConnectConfig { + /** + * A factory function used to create the Subject through which the source + * is multicast. By default this creates a {@link Subject}. + */ + connector: () => SubjectLike; +} + /** - * The default connector function used for `connect`. - * A factory function that will create a {@link Subject}. + * The default configuration for `connect`. */ -function defaultConnector() { - return new Subject(); -} +const DEFAULT_CONFIG: ConnectConfig = { + connector: () => new Subject(), +}; /** * Creates an observable by multicasting the source within a function that @@ -25,16 +32,16 @@ function defaultConnector() { * has returned, if the source is synchronous its internal reference count will jump from * 0 to 1 back to 0 and reset. * - * To use `connect`, you provide a `setup` function via configuration that will give you + * To use `connect`, you provide a `selector` function that will give you * a multicast observable that is not yet connected. You then use that multicast observable * to create a resulting observable that, when subscribed, will set up your multicast. This is * generally, but not always, accomplished with {@link merge}. * - * Note that using a {@link takeUntil} inside of `connect`'s `setup` _might_ mean you were looking + * Note that using a {@link takeUntil} inside of `connect`'s `selector` _might_ mean you were looking * to use the {@link takeWhile} operator instead. * - * When you subscribe to the result of `connect`, the `setup` function will be called. After - * the `setup` function returns, the observable it returns will be subscribed to, _then_ the + * When you subscribe to the result of `connect`, the `selector` function will be called. After + * the `selector` function returns, the observable it returns will be subscribed to, _then_ the * multicast will be connected to the source. * * ### Example @@ -53,14 +60,12 @@ function defaultConnector() { * }); * * source$.pipe( - * connect({ - * // Notice in here we're merging three subscriptions to `shared$`. - * setup: (shared$) => merge( + * // Notice in here we're merging 3 subscriptions to `shared$`. + * connect((shared$) => merge( * shared$.pipe(map(n => `all ${n}`)), * shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${n}`)), * shared$.pipe(filter(n => n % 2 === 1), map(n => `odd ${n}`)), - * ) - * }) + * )) * ) * .subscribe(console.log); * @@ -83,29 +88,21 @@ function defaultConnector() { * "odd 5" * ``` * + * @param selector A function used to set up the multicast. Gives you a multicast observable + * that is not yet connected. With that, you're expected to create and return + * and Observable, that when subscribed to, will utilize the multicast observable. + * After this function is executed -- and its return value subscribed to -- the + * the operator will subscribe to the source, and the connection will be made. * @param param0 The configuration object for `connect`. */ -export function connect({ - connector = defaultConnector, - setup, -}: { - /** - * A factory function used to create the Subject through which the source - * is multicast. By default this creates a {@link Subject}. - */ - connector?: () => SubjectLike; - /** - * A function used to set up the multicast. Gives you a multicast observable - * that is not yet connected. With that, you're expected to create and return - * and Observable, that when subscribed to, will utilize the multicast observable. - * After this function is executed -- and its return value subscribed to -- the - * the operator will subscribe to the source, and the connection will be made. - */ - setup: (shared: Observable) => ObservableInput; -}): OperatorFunction { +export function connect( + selector: (shared: Observable) => ObservableInput, + config: ConnectConfig = DEFAULT_CONFIG +): OperatorFunction { + const { connector } = config; return operate((source, subscriber) => { const subject = connector(); - from(setup(fromSubscribable(subject))).subscribe(subscriber); + from(selector(fromSubscribable(subject))).subscribe(subscriber); subscriber.add(source.subscribe(subject)); }); } diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 2c8e3e7234..1f5ec936d0 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -58,9 +58,9 @@ export function multicast(subjectFactory: () => Subject): UnaryFunction>( subjectFactory: () => Subject, @@ -77,9 +77,8 @@ export function multicast( // If a selector function is provided, then we're a "normal" operator that isn't // going to return a ConnectableObservable. We can use `connect` to do what we // need to do. - return connect({ + return connect(selector, { connector: subjectFactory, - setup: selector, }); } diff --git a/src/internal/operators/publish.ts b/src/internal/operators/publish.ts index 2d8414044b..2dfa9ebfb2 100644 --- a/src/internal/operators/publish.ts +++ b/src/internal/operators/publish.ts @@ -28,7 +28,7 @@ export function publish(): UnaryFunction, ConnectableObservable * @param selector A function used to setup multicasting prior to automatic connection. * * @deprecated To be removed in version 8. Use the new {@link connect} operator. - * If you're using `publish(fn)`, it is equivalent to `connect({ setup: fn })`. + * If you're using `publish(fn)`, it is equivalent to `connect(fn)`. */ export function publish>(selector: (shared: Observable) => O): OperatorFunction>; @@ -79,10 +79,5 @@ export function publish>(selector: (shared: Ob * @return A ConnectableObservable that upon connection causes the source Observable to emit items to its Observers. */ export function publish(selector?: OperatorFunction): MonoTypeOperatorFunction | OperatorFunction { - return selector - ? connect({ - connector: () => new Subject(), - setup: selector, - }) - : multicast(new Subject()); + return selector ? connect(selector) : multicast(new Subject()); } diff --git a/src/internal/operators/publishReplay.ts b/src/internal/operators/publishReplay.ts index ada208dd16..6e2aa5c33f 100644 --- a/src/internal/operators/publishReplay.ts +++ b/src/internal/operators/publishReplay.ts @@ -39,7 +39,7 @@ export function publishReplay( * @param timestampProvider The timestamp provider for the underlying {@link ReplaySubject}. * @deprecated To be removed in version 8. Use the new {@link connect} operator. * `source.pipe(publishReplay(size, window, fn, scheduler))` is equivalent to - * `const subject = new ReplaySubject(size, window, scheduler), connect({ connector: () => subject, setup: fn })`. + * `const subject = new ReplaySubject(size, window, scheduler), source.pipe(connect(fn, { connector: () => subject }))`. */ export function publishReplay>( bufferSize: number | undefined, diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index 671bfc12f4..23711b11b1 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -7,7 +7,7 @@ import { Subscription } from '../Subscription'; import { from } from '../observable/from'; import { operate } from '../util/lift'; -interface ShareOptions { +export interface ShareConfig { /** * The factory used to create the subject that will connect the source observable to * multicast consumers. @@ -43,7 +43,7 @@ interface ShareOptions { export function share(): MonoTypeOperatorFunction; -export function share(options: ShareOptions): OperatorFunction; +export function share(options: ShareConfig): MonoTypeOperatorFunction; /** * Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one @@ -87,7 +87,7 @@ export function share(options: ShareOptions): OperatorFunction(options?: ShareOptions): OperatorFunction { +export function share(options?: ShareConfig): OperatorFunction { options = options || {}; const { connector = () => new Subject(), resetOnComplete = true, resetOnError = true, resetOnRefCountZero = true } = options; diff --git a/src/internal/types.ts b/src/internal/types.ts index 851916a740..42b651ad77 100644 --- a/src/internal/types.ts +++ b/src/internal/types.ts @@ -87,7 +87,7 @@ export type SubscribableOrPromise = Subscribable | Subscribable | P /** OBSERVABLE INTERFACES */ export interface Subscribable { - subscribe(observer: Observer): Unsubscribable; + subscribe(observer: Partial>): Unsubscribable; } /** @@ -172,7 +172,7 @@ export interface Observer { complete: () => void; } -export type SubjectLike = Observer & Subscribable; +export interface SubjectLike extends Observer, Subscribable {} /** SCHEDULER INTERFACES */