Skip to content

Commit

Permalink
fix(race): unsubscribe raced observables with immediate scheduler (#2158
Browse files Browse the repository at this point in the history
)

Fixed the bug where racing an Observable that emits immediately,
with default scheduler, leaves other raced Observables unsubscribed
forever. Now immediate emit makes the race ignore following Observables.
  • Loading branch information
Kimmo Kiiski authored and benlesh committed Dec 12, 2016
1 parent b5a3413 commit 7dd533b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
36 changes: 36 additions & 0 deletions spec/operators/race-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {expect} from 'chai';
import * as sinon from 'sinon';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, expectObservable, expectSubscriptions};

Expand Down Expand Up @@ -171,4 +172,39 @@ describe('Observable.prototype.race', () => {
expect(x).to.be.true;
}, done, done);
});

it('should ignore latter observables if a former one emits immediately', () => {
const onNext = sinon.spy();
const onSubscribe = sinon.spy();
const e1 = Observable.of('a'); // Wins the race
const e2 = Observable.defer(onSubscribe); // Should be ignored

e1.race(e2).subscribe(onNext);
expect(onNext.calledWithExactly('a')).to.be.true;
expect(onSubscribe.called).to.be.false;
});

it('should unsubscribe former observables if a latter one emits immediately', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
const e1 = Observable.never<string>().finally(onUnsubscribe); // Should be unsubscribed
const e2 = Observable.of('b'); // Wins the race

e1.race(e2).subscribe(onNext);
expect(onNext.calledWithExactly('b')).to.be.true;
expect(onUnsubscribe.calledOnce).to.be.true;
});

it('should unsubscribe from immediately emitting observable on unsubscription', () => {
const onNext = sinon.spy();
const onUnsubscribe = sinon.spy();
const e1 = Observable.never<string>().startWith('a').finally(onUnsubscribe); // Wins the race
const e2 = Observable.never<string>(); // Loses the race

const subscription = e1.race(e2).subscribe(onNext);
expect(onNext.calledWithExactly('a')).to.be.true;
expect(onUnsubscribe.called).to.be.false;
subscription.unsubscribe();
expect(onUnsubscribe.calledOnce).to.be.true;
});
});
4 changes: 2 additions & 2 deletions src/operator/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
if (len === 0) {
this.destination.complete();
} else {
for (let i = 0; i < len; i++) {
for (let i = 0; i < len && !this.hasFirst; i++) {
let observable = observables[i];
let subscription = subscribeToResult(this, observable, observable, i);

if (this.subscriptions) {
this.subscriptions.push(subscription);
this.add(subscription);
}
this.add(subscription);
}
this.observables = null;
}
Expand Down

0 comments on commit 7dd533b

Please sign in to comment.