From 38a45f8b62133b1225c31ece594978484f95b230 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 16 Sep 2015 20:37:45 -0700 Subject: [PATCH] fix(switchAll/switchLatest): inner subscriptions should now properly unsub - ensures that inner subscriptions are NOT provided a destination, which changes the executiong path of unsubscribe() fixes #302 --- spec/operators/switchAll-spec.js | 17 +++++++++++++++++ spec/operators/switchLatest-spec.js | 17 ++++++++++++++++- src/operators/switchAll.ts | 21 +++++++++++++++++---- src/operators/switchLatest.ts | 26 +++++++++++++++++++------- 4 files changed, 69 insertions(+), 12 deletions(-) diff --git a/spec/operators/switchAll-spec.js b/spec/operators/switchAll-spec.js index 9731cc6c7c..555749c287 100644 --- a/spec/operators/switchAll-spec.js +++ b/spec/operators/switchAll-spec.js @@ -17,6 +17,23 @@ describe('Observable.prototype.switchAll()', function(){ }, null, done); }); + it('should unsub inner observables', function() { + var unsubbed = []; + + Observable.of('a', 'b').map(function(x) { + return Observable.create(function(subscriber) { + subscriber.complete(); + return function() { + unsubbed.push(x); + }; + }); + }) + .mergeAll() + .subscribe(); + + expect(unsubbed).toEqual(['a', 'b']); + }); + it("should switch to each inner Observable", function (done) { var a = Observable.of(1, 2, 3); var b = Observable.of(4, 5, 6); diff --git a/spec/operators/switchLatest-spec.js b/spec/operators/switchLatest-spec.js index 8820e630b8..0502a71684 100644 --- a/spec/operators/switchLatest-spec.js +++ b/spec/operators/switchLatest-spec.js @@ -3,7 +3,7 @@ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var immediateScheduler = Rx.Scheduler.immediate; -fdescribe('Observable.prototype.switchLatest()', function () { +describe('Observable.prototype.switchLatest()', function () { it("should switch with a selector function", function (done) { var a = Observable.of(1, 2, 3); var expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3']; @@ -13,6 +13,21 @@ fdescribe('Observable.prototype.switchLatest()', function () { expect(x).toBe(expected.shift()); }, null, done); }); + + it('should unsub inner observables', function(){ + var unsubbed = []; + + Observable.of('a', 'b').switchLatest(function(x) { + return Observable.create(function(subscriber) { + subscriber.complete(); + return function() { + unsubbed.push(x); + }; + }); + }).subscribe(); + + expect(unsubbed).toEqual(['a', 'b']); + }); it('should switch inner cold observables', function (){ var x = cold( '--a--b--c--d--e--|') diff --git a/src/operators/switchAll.ts b/src/operators/switchAll.ts index f60a965c25..9b84ad6598 100644 --- a/src/operators/switchAll.ts +++ b/src/operators/switchAll.ts @@ -30,7 +30,7 @@ class SwitchSubscriber extends Subscriber { _next(value: any) { this.active++; this.unsubscribeInner(); - this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this.destination, this))); + this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this))); } _complete() { @@ -49,6 +49,14 @@ class SwitchSubscriber extends Subscriber { } } + notifyNext(value: T) { + this.destination.next(value); + } + + notifyError(err: any) { + this.destination.error(err); + } + notifyComplete() { this.unsubscribeInner(); if(this.hasCompleted && this.active === 0) { @@ -58,13 +66,18 @@ class SwitchSubscriber extends Subscriber { } class InnerSwitchSubscriber extends Subscriber { - constructor(destination: Observer, private parent: SwitchSubscriber) { - super(destination); + constructor(private parent: SwitchSubscriber) { + super(); } _next(value: T) { - super._next(value); + this.parent.notifyNext(value); + } + + _error(err: any) { + this.parent.notifyError(err); } + _complete() { this.parent.notifyComplete(); } diff --git a/src/operators/switchLatest.ts b/src/operators/switchLatest.ts index 79ea6164c6..7a984eb97c 100644 --- a/src/operators/switchLatest.ts +++ b/src/operators/switchLatest.ts @@ -45,7 +45,7 @@ class SwitchLatestSubscriber extends Subscriber { if(innerSubscription) { innerSubscription.unsubscribe(); } - this.add(this.innerSubscription = result.subscribe(new InnerSwitchLatestSubscriber(destination, this, this.resultSelector, index, value))) + this.add(this.innerSubscription = result.subscribe(new InnerSwitchLatestSubscriber(this, this.resultSelector, index, value))) } } @@ -64,34 +64,46 @@ class SwitchLatestSubscriber extends Subscriber { this.destination.complete(); } } + + notifyError(err: any) { + this.destination.error(err); + } + + notifyNext(value: T) { + this.destination.next(value); + } } class InnerSwitchLatestSubscriber extends Subscriber { private index: number = 0; - constructor(destination: Observer, private parent: SwitchLatestSubscriber, + constructor(private parent: SwitchLatestSubscriber, private resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, private outerIndex: number, private outerValue: any) { - super(destination); + super(); } _next(value: T) { - const destination = this.destination; + const parent = this.parent; const index = this.index++; const resultSelector = this.resultSelector; if(resultSelector) { let result = tryCatch(resultSelector)(value, this.outerValue, index, this.outerIndex); if(result === errorObject) { - destination.error(result.e); + parent.notifyError(result.e); } else { - destination.next(result); + parent.notifyNext(result); } } else { - destination.next(value); + parent.notifyNext(value); } } + _error(err: T) { + this.parent.notifyError(err); + } + _complete() { this.parent.notifyComplete(this); }