Skip to content

Commit

Permalink
feat: add raceWith (#5303)
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant authored Apr 29, 2020
1 parent 259e5cd commit ca7f370
Show file tree
Hide file tree
Showing 6 changed files with 283 additions and 11 deletions.
16 changes: 16 additions & 0 deletions spec-dtslint/operators/raceWith-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { raceWith } from 'rxjs/operators';
import { a$, b, b$, c, c$, d$, e$, f$ } from '../helpers';

describe('raceWith', () => {
it('should support N arguments of different types', () => {
const o1 = a$.pipe(raceWith(b$)); // $ExpectType Observable<A | B>
const o2 = a$.pipe(raceWith(b$, c$)); // $ExpectType Observable<A | B | C>
const o3 = a$.pipe(raceWith(b$, c$, d$)); // $ExpectType Observable<A | B | C | D>
const o4 = a$.pipe(raceWith(b$, c$, d$, e$)); // $ExpectType Observable<A | B | C | D | E>
const o5 = a$.pipe(raceWith(b$, c$, d$, e$, f$)); // $ExpectType Observable<A | B | C | D | E | F>
});
});

it('should race observable inputs', () => {
const o = a$.pipe(raceWith(Promise.resolve(b), [c])); // $ExpectType Observable<A | B | C>
});
4 changes: 2 additions & 2 deletions spec/operators/race-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { EMPTY, NEVER, of, race as staticRace, timer, defer, Observable, throwError } from 'rxjs';
import { EMPTY, NEVER, of, timer, defer, Observable, throwError } from 'rxjs';
import { race, mergeMap, map, finalize, startWith } from 'rxjs/operators';

/** @test {race} */
Expand Down Expand Up @@ -168,7 +168,7 @@ describe('race operator', () => {
const e1 = of(true);
const e2 = timer(200).pipe(map(_ => false));

staticRace(e1, e2).subscribe(x => {
e1.pipe(race(e2)).subscribe(x => {
expect(x).to.be.true;
}, done, done);
});
Expand Down
218 changes: 218 additions & 0 deletions spec/operators/raceWith-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { EMPTY, NEVER, of, timer, defer, Observable, throwError } from 'rxjs';
import { raceWith, mergeMap, map, finalize, startWith } from 'rxjs/operators';

/** @test {raceWith} */
describe('raceWith operator', () => {
it('should race cold and cold', () => {
const e1 = cold('---a-----b-----c----|');
const e1subs = '^ !';
const e2 = cold('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----b-----c----|';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and hot', () => {
const e1 = hot('---a-----b-----c----|');
const e1subs = '^ !';
const e2 = hot('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----b-----c----|';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race hot and cold', () => {
const e1 = cold('---a-----b-----c----|');
const e1subs = '^ !';
const e2 = hot('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----b-----c----|';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race 2nd and 1st', () => {
const e1 = cold('------x-----y-----z----|');
const e1subs = '^ !';
const e2 = cold('---a-----b-----c----|');
const e2subs = '^ !';
const expected = '---a-----b-----c----|';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should race emit and complete', () => {
const e1 = cold('-----|');
const e1subs = '^ !';
const e2 = hot('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '-----|';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow unsubscribing early and explicitly', () => {
const e1 = cold('---a-----b-----c----|');
const e1subs = '^ !';
const e2 = hot('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----b---';
const unsub = ' !';

const result = e1.pipe(raceWith(e2));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should not break unsubscription chains when unsubscribed explicitly', () => {
const e1 = hot('--a--^--b--c---d-| ');
const e1subs = '^ ! ';
const e2 = hot('---e-^---f--g---h-|');
const e2subs = '^ ! ';
const expected = '---b--c--- ';
const unsub = ' ! ';

const result = e1.pipe(
mergeMap((x: string) => of(x)),
raceWith(e2),
mergeMap((x: string) => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should never emit when given non emitting sources', () => {
const e1 = cold('---|');
const e2 = cold('---|');
const e1subs = '^ !';
const expected = '---|';

const source = e1.pipe(raceWith(e2));

expectObservable(source).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should throw when error occurs mid stream', () => {
const e1 = cold('---a-----#');
const e1subs = '^ !';
const e2 = cold('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---a-----#';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should throw when error occurs before a winner is found', () => {
const e1 = cold('---#');
const e1subs = '^ !';
const e2 = cold('------x-----y-----z----|');
const e2subs = '^ !';
const expected = '---#';

const result = e1.pipe(raceWith(e2));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should allow observable emits immediately', (done: MochaDone) => {
const e1 = of(true);
const e2 = timer(200).pipe(map(_ => false));

e1.pipe(raceWith(e2)).subscribe(x => {
expect(x).to.be.true;
}, done, done);
});

it('should ignore latter observables if a former one emits immediately', () => {
const onNext = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = of('a'); // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(raceWith(e2)).subscribe(onNext);
expect(onNext.calledWithExactly('a')).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should ignore latter observables if a former one completes immediately', () => {
const onComplete = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = EMPTY; // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(raceWith(e2)).subscribe({ complete: onComplete });
expect(onComplete.calledWithExactly()).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should ignore latter observables if a former one errors immediately', () => {
const onError = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = throwError('kaboom'); // Wins the race
const e2 = defer(onSubscribe); // Should be ignored

e1.pipe(raceWith(e2)).subscribe({ error: onError });
expect(onError.calledWithExactly('kaboom')).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should unsubscribe former observables if a latter one emits immediately', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
const e1 = NEVER.pipe(finalize(onUnsubscribe)); // Should be unsubscribed
const e2 = of('b'); // Wins the race

e1.pipe(raceWith(e2)).subscribe(onNext);
expect(onNext.calledWithExactly('b')).to.be.true;
expect(onUnsubscribe.calledOnce).to.be.true;
});

it('should unsubscribe from immediately emitting observable on unsubscription', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
const e1 = <Observable<never>>NEVER.pipe(startWith('a'), finalize(onUnsubscribe)); // Wins the race
const e2 = NEVER; // Loses the race

const subscription = e1.pipe(raceWith(e2)).subscribe(onNext);
expect(onNext.calledWithExactly('a')).to.be.true;
expect(onUnsubscribe.called).to.be.false;
subscription.unsubscribe();
expect(onUnsubscribe.calledOnce).to.be.true;
});
});
2 changes: 1 addition & 1 deletion src/internal/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export { publish } from './publish';
export { publishBehavior } from './publishBehavior';
export { publishLast } from './publishLast';
export { publishReplay } from './publishReplay';
export { race } from './race';
export { race, raceWith } from './raceWith';
export { reduce } from './reduce';
export { repeat } from './repeat';
export { repeatWhen } from './repeatWhen';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Observable } from '../Observable';
import { isArray } from '../util/isArray';
import { MonoTypeOperatorFunction, OperatorFunction } from '../types';
import { MonoTypeOperatorFunction, OperatorFunction, ObservableInput, ObservedValueUnionFromArray } from '../types';
import { race as raceStatic } from '../observable/race';

/* tslint:disable:max-line-length */
/** @deprecated Deprecated in favor of static race. */
/** @deprecated Deprecated use {@link raceWith} */
export function race<T>(observables: Array<Observable<T>>): MonoTypeOperatorFunction<T>;
/** @deprecated Deprecated in favor of static race. */
/** @deprecated Deprecated use {@link raceWith} */
export function race<T, R>(observables: Array<Observable<T>>): OperatorFunction<T, R>;
/** @deprecated Deprecated in favor of static race. */
/** @deprecated Deprecated use {@link raceWith} */
export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): MonoTypeOperatorFunction<T>;
/** @deprecated Deprecated in favor of static race. */
/** @deprecated Deprecated use {@link raceWith} */
export function race<T, R>(...observables: Array<Observable<any> | Array<Observable<any>>>): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

Expand All @@ -19,8 +19,7 @@ export function race<T, R>(...observables: Array<Observable<any> | Array<Observa
* error or complete notification from the combination of this Observable and supplied Observables.
* @param {...Observables} ...observables Sources used to race for which Observable emits first.
* @return {Observable} An Observable that mirrors the output of the first Observable to emit an item.
* @name race
* @deprecated Deprecated in favor of static {@link race}.
* @deprecated Deprecated use {@link raceWith}
*/
export function race<T>(...observables: (Observable<T> | Observable<T>[])[]): MonoTypeOperatorFunction<T> {
return function raceOperatorFunction(source: Observable<T>) {
Expand All @@ -36,3 +35,42 @@ export function race<T>(...observables: (Observable<T> | Observable<T>[])[]): Mo
) as Observable<T>;
};
}

/**
* Creates an Observable that mirrors the first source Observable to emit a next,
* error or complete notification from the combination of the Observable to which
* the operator is applied and supplied Observables.
*
* ## Example
*
* ```ts
* import { interval } from 'rxjs';
* import { mapTo, raceWith } from 'rxjs/operators';
*
* const obs1 = interval(1000).pipe(mapTo('fast one'));
* const obs2 = interval(3000).pipe(mapTo('medium one'));
* const obs3 = interval(5000).pipe(mapTo('slow one'));
*
* obs2.pipe(
* raceWith(obs3, obs1)
* ).subscribe(
* winner => console.log(winner)
* );
*
* // Outputs
* // a series of 'fast one'
* ```
*
* @param otherSources Sources used to race for which Observable emits first.
*/

export function raceWith<T, A extends ObservableInput<any>[]>(
...otherSources: A
): OperatorFunction<T, T | ObservedValueUnionFromArray<A>> {
return function raceWithOperatorFunction(source: Observable<T>) {
return source.lift.call(
raceStatic(source, ...otherSources),
undefined
) as Observable<T | ObservedValueUnionFromArray<A>>;
};
}
2 changes: 1 addition & 1 deletion src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export { publish } from '../internal/operators/publish';
export { publishBehavior } from '../internal/operators/publishBehavior';
export { publishLast } from '../internal/operators/publishLast';
export { publishReplay } from '../internal/operators/publishReplay';
export { race } from '../internal/operators/race';
export { race, raceWith } from '../internal/operators/raceWith';
export { reduce } from '../internal/operators/reduce';
export { repeat } from '../internal/operators/repeat';
export { repeatWhen } from '../internal/operators/repeatWhen';
Expand Down

0 comments on commit ca7f370

Please sign in to comment.