diff --git a/spec-dtslint/operators/raceWith-spec.ts b/spec-dtslint/operators/raceWith-spec.ts new file mode 100644 index 0000000000..4920c5383b --- /dev/null +++ b/spec-dtslint/operators/raceWith-spec.ts @@ -0,0 +1,16 @@ +import { raceWith } from 'rxjs/operators'; +import { a$, b, b$, c, c$, d$, e$, f$ } from '../helpers'; + +describe('raceWith', () => { + it('should support N arguments of different types', () => { + const o1 = a$.pipe(raceWith(b$)); // $ExpectType Observable + const o2 = a$.pipe(raceWith(b$, c$)); // $ExpectType Observable + const o3 = a$.pipe(raceWith(b$, c$, d$)); // $ExpectType Observable + const o4 = a$.pipe(raceWith(b$, c$, d$, e$)); // $ExpectType Observable + const o5 = a$.pipe(raceWith(b$, c$, d$, e$, f$)); // $ExpectType Observable + }); +}); + +it('should race observable inputs', () => { + const o = a$.pipe(raceWith(Promise.resolve(b), [c])); // $ExpectType Observable +}); diff --git a/spec/operators/race-spec.ts b/spec/operators/race-spec.ts index bcbe0779ec..31eca2ecf8 100644 --- a/spec/operators/race-spec.ts +++ b/spec/operators/race-spec.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; import * as sinon from 'sinon'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { EMPTY, NEVER, of, race as staticRace, timer, defer, Observable, throwError } from 'rxjs'; +import { EMPTY, NEVER, of, timer, defer, Observable, throwError } from 'rxjs'; import { race, mergeMap, map, finalize, startWith } from 'rxjs/operators'; /** @test {race} */ @@ -168,7 +168,7 @@ describe('race operator', () => { const e1 = of(true); const e2 = timer(200).pipe(map(_ => false)); - staticRace(e1, e2).subscribe(x => { + e1.pipe(race(e2)).subscribe(x => { expect(x).to.be.true; }, done, done); }); diff --git a/spec/operators/raceWith-spec.ts b/spec/operators/raceWith-spec.ts new file mode 100644 index 0000000000..880592c4f3 --- /dev/null +++ b/spec/operators/raceWith-spec.ts @@ -0,0 +1,218 @@ +import { expect } from 'chai'; +import * as sinon from 'sinon'; +import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; +import { EMPTY, NEVER, of, timer, defer, Observable, throwError } from 'rxjs'; +import { raceWith, mergeMap, map, finalize, startWith } from 'rxjs/operators'; + +/** @test {raceWith} */ +describe('raceWith operator', () => { + it('should race cold and cold', () => { + const e1 = cold('---a-----b-----c----|'); + const e1subs = '^ !'; + const e2 = cold('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---a-----b-----c----|'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race hot and hot', () => { + const e1 = hot('---a-----b-----c----|'); + const e1subs = '^ !'; + const e2 = hot('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---a-----b-----c----|'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race hot and cold', () => { + const e1 = cold('---a-----b-----c----|'); + const e1subs = '^ !'; + const e2 = hot('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---a-----b-----c----|'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race 2nd and 1st', () => { + const e1 = cold('------x-----y-----z----|'); + const e1subs = '^ !'; + const e2 = cold('---a-----b-----c----|'); + const e2subs = '^ !'; + const expected = '---a-----b-----c----|'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race emit and complete', () => { + const e1 = cold('-----|'); + const e1subs = '^ !'; + const e2 = hot('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '-----|'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should allow unsubscribing early and explicitly', () => { + const e1 = cold('---a-----b-----c----|'); + const e1subs = '^ !'; + const e2 = hot('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---a-----b---'; + const unsub = ' !'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not break unsubscription chains when unsubscribed explicitly', () => { + const e1 = hot('--a--^--b--c---d-| '); + const e1subs = '^ ! '; + const e2 = hot('---e-^---f--g---h-|'); + const e2subs = '^ ! '; + const expected = '---b--c--- '; + const unsub = ' ! '; + + const result = e1.pipe( + mergeMap((x: string) => of(x)), + raceWith(e2), + mergeMap((x: string) => of(x)) + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should never emit when given non emitting sources', () => { + const e1 = cold('---|'); + const e2 = cold('---|'); + const e1subs = '^ !'; + const expected = '---|'; + + const source = e1.pipe(raceWith(e2)); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should throw when error occurs mid stream', () => { + const e1 = cold('---a-----#'); + const e1subs = '^ !'; + const e2 = cold('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---a-----#'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should throw when error occurs before a winner is found', () => { + const e1 = cold('---#'); + const e1subs = '^ !'; + const e2 = cold('------x-----y-----z----|'); + const e2subs = '^ !'; + const expected = '---#'; + + const result = e1.pipe(raceWith(e2)); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should allow observable emits immediately', (done: MochaDone) => { + const e1 = of(true); + const e2 = timer(200).pipe(map(_ => false)); + + e1.pipe(raceWith(e2)).subscribe(x => { + expect(x).to.be.true; + }, done, done); + }); + + it('should ignore latter observables if a former one emits immediately', () => { + const onNext = sinon.spy(); + const onSubscribe = sinon.spy(); + const e1 = of('a'); // Wins the race + const e2 = defer(onSubscribe); // Should be ignored + + e1.pipe(raceWith(e2)).subscribe(onNext); + expect(onNext.calledWithExactly('a')).to.be.true; + expect(onSubscribe.called).to.be.false; + }); + + it('should ignore latter observables if a former one completes immediately', () => { + const onComplete = sinon.spy(); + const onSubscribe = sinon.spy(); + const e1 = EMPTY; // Wins the race + const e2 = defer(onSubscribe); // Should be ignored + + e1.pipe(raceWith(e2)).subscribe({ complete: onComplete }); + expect(onComplete.calledWithExactly()).to.be.true; + expect(onSubscribe.called).to.be.false; + }); + + it('should ignore latter observables if a former one errors immediately', () => { + const onError = sinon.spy(); + const onSubscribe = sinon.spy(); + const e1 = throwError('kaboom'); // Wins the race + const e2 = defer(onSubscribe); // Should be ignored + + e1.pipe(raceWith(e2)).subscribe({ error: onError }); + expect(onError.calledWithExactly('kaboom')).to.be.true; + expect(onSubscribe.called).to.be.false; + }); + + it('should unsubscribe former observables if a latter one emits immediately', () => { + const onNext = sinon.spy(); + const onUnsubscribe = sinon.spy(); + const e1 = NEVER.pipe(finalize(onUnsubscribe)); // Should be unsubscribed + const e2 = of('b'); // Wins the race + + e1.pipe(raceWith(e2)).subscribe(onNext); + expect(onNext.calledWithExactly('b')).to.be.true; + expect(onUnsubscribe.calledOnce).to.be.true; + }); + + it('should unsubscribe from immediately emitting observable on unsubscription', () => { + const onNext = sinon.spy(); + const onUnsubscribe = sinon.spy(); + const e1 = >NEVER.pipe(startWith('a'), finalize(onUnsubscribe)); // Wins the race + const e2 = NEVER; // Loses the race + + const subscription = e1.pipe(raceWith(e2)).subscribe(onNext); + expect(onNext.calledWithExactly('a')).to.be.true; + expect(onUnsubscribe.called).to.be.false; + subscription.unsubscribe(); + expect(onUnsubscribe.calledOnce).to.be.true; + }); +}); diff --git a/src/internal/operators/index.ts b/src/internal/operators/index.ts index 016cbb1b08..8d3b66be45 100644 --- a/src/internal/operators/index.ts +++ b/src/internal/operators/index.ts @@ -57,7 +57,7 @@ export { publish } from './publish'; export { publishBehavior } from './publishBehavior'; export { publishLast } from './publishLast'; export { publishReplay } from './publishReplay'; -export { race } from './race'; +export { race, raceWith } from './raceWith'; export { reduce } from './reduce'; export { repeat } from './repeat'; export { repeatWhen } from './repeatWhen'; diff --git a/src/internal/operators/race.ts b/src/internal/operators/raceWith.ts similarity index 51% rename from src/internal/operators/race.ts rename to src/internal/operators/raceWith.ts index 7efda22d4b..e6bd07a328 100644 --- a/src/internal/operators/race.ts +++ b/src/internal/operators/raceWith.ts @@ -1,16 +1,16 @@ import { Observable } from '../Observable'; import { isArray } from '../util/isArray'; -import { MonoTypeOperatorFunction, OperatorFunction } from '../types'; +import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, ObservedValueUnionFromArray } from '../types'; import { race as raceStatic } from '../observable/race'; /* tslint:disable:max-line-length */ -/** @deprecated Deprecated in favor of static race. */ +/** @deprecated Deprecated use {@link raceWith} */ export function race(observables: Array>): MonoTypeOperatorFunction; -/** @deprecated Deprecated in favor of static race. */ +/** @deprecated Deprecated use {@link raceWith} */ export function race(observables: Array>): OperatorFunction; -/** @deprecated Deprecated in favor of static race. */ +/** @deprecated Deprecated use {@link raceWith} */ export function race(...observables: Array | Array>>): MonoTypeOperatorFunction; -/** @deprecated Deprecated in favor of static race. */ +/** @deprecated Deprecated use {@link raceWith} */ export function race(...observables: Array | Array>>): OperatorFunction; /* tslint:enable:max-line-length */ @@ -19,8 +19,7 @@ export function race(...observables: Array | Array(...observables: (Observable | Observable[])[]): MonoTypeOperatorFunction { return function raceOperatorFunction(source: Observable) { @@ -36,3 +35,42 @@ export function race(...observables: (Observable | Observable[])[]): Mo ) as Observable; }; } + +/** + * Creates an Observable that mirrors the first source Observable to emit a next, + * error or complete notification from the combination of the Observable to which + * the operator is applied and supplied Observables. + * + * ## Example + * + * ```ts + * import { interval } from 'rxjs'; + * import { mapTo, raceWith } from 'rxjs/operators'; + * + * const obs1 = interval(1000).pipe(mapTo('fast one')); + * const obs2 = interval(3000).pipe(mapTo('medium one')); + * const obs3 = interval(5000).pipe(mapTo('slow one')); + * + * obs2.pipe( + * raceWith(obs3, obs1) + * ).subscribe( + * winner => console.log(winner) + * ); + * + * // Outputs + * // a series of 'fast one' + * ``` + * + * @param otherSources Sources used to race for which Observable emits first. + */ + +export function raceWith[]>( + ...otherSources: A +): OperatorFunction> { + return function raceWithOperatorFunction(source: Observable) { + return source.lift.call( + raceStatic(source, ...otherSources), + undefined + ) as Observable>; + }; +} diff --git a/src/operators/index.ts b/src/operators/index.ts index b4c007bc7b..0114b463a8 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -61,7 +61,7 @@ export { publish } from '../internal/operators/publish'; export { publishBehavior } from '../internal/operators/publishBehavior'; export { publishLast } from '../internal/operators/publishLast'; export { publishReplay } from '../internal/operators/publishReplay'; -export { race } from '../internal/operators/race'; +export { race, raceWith } from '../internal/operators/raceWith'; export { reduce } from '../internal/operators/reduce'; export { repeat } from '../internal/operators/repeat'; export { repeatWhen } from '../internal/operators/repeatWhen';