Skip to content

Commit

Permalink
fix(operator): Fix take to complete when the source is re-entrant.
Browse files Browse the repository at this point in the history
The TakeSubscriber compares the instance variable `count` to determine if it should complete or not.

If the source Observable is re-entrant, this check fails (because `count` is updated in each `next`), and take never completes.
  • Loading branch information
trxcllnt committed Sep 22, 2016
1 parent 646b38d commit 86615cb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
16 changes: 16 additions & 0 deletions spec/operators/take-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Subject = Rx.Subject;
const Observable = Rx.Observable;

/** @test {take} */
Expand Down Expand Up @@ -135,4 +136,19 @@ describe('Observable.prototype.take', () => {

source.subscribe();
});

it('should complete when the source is reentrant', () => {
let completed = false;
const source = new Subject();
source.take(5).subscribe({
next() {
source.next();
},
complete() {
completed = true;
}
});
source.next();
expect(completed).to.be.true;
});
});
5 changes: 3 additions & 2 deletions src/operator/take.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ class TakeSubscriber<T> extends Subscriber<T> {

protected _next(value: T): void {
const total = this.total;
if (++this.count <= total) {
const count = ++this.count;
if (count <= total) {
this.destination.next(value);
if (this.count === total) {
if (count === total) {
this.destination.complete();
this.unsubscribe();
}
Expand Down

0 comments on commit 86615cb

Please sign in to comment.