From 1631224b3b74e60170088ea1b0c49ddcf9f5c836 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Tue, 22 Mar 2016 14:01:52 -0700 Subject: [PATCH] fix(observable): ensure the subscriber chain is complete before calling this._subscribe --- spec/Observable-spec.ts | 141 +++++++++++++++++++++++++++++++++++++++- spec/Subject-spec.ts | 5 +- spec/Subscriber-spec.ts | 53 +++++++++++++++ src/Observable.ts | 25 ++++--- src/Subscriber.ts | 4 ++ 5 files changed, 215 insertions(+), 13 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 5bde7635ae..34e1cc007b 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -251,6 +251,76 @@ describe('Observable', () => { expect(messageErrorValue).toBe('boo!'); }); + it('should ignore next messages after unsubscription', () => { + let times = 0; + + new Observable((observer: Rx.Observer) => { + observer.next(0); + observer.next(0); + observer.next(0); + observer.next(0); + }) + .do(() => times += 1) + .subscribe( + function() { + if (times === 2) { + this.unsubscribe(); + } + } + ); + + expect(times).toBe(2); + }); + + it('should ignore error messages after unsubscription', () => { + let times = 0; + let errorCalled = false; + + new Observable((observer: Rx.Observer) => { + observer.next(0); + observer.next(0); + observer.next(0); + observer.error(0); + }) + .do(() => times += 1) + .subscribe( + function() { + if (times === 2) { + this.unsubscribe(); + } + }, + function() { errorCalled = true; } + ); + + expect(times).toBe(2); + expect(errorCalled).toBe(false); + }); + + it('should ignore complete messages after unsubscription', () => { + let times = 0; + let completeCalled = false; + + new Observable((observer: Rx.Observer) => { + observer.next(0); + observer.next(0); + observer.next(0); + observer.complete(); + }) + .do(() => times += 1) + .subscribe( + function() { + if (times === 2) { + this.unsubscribe(); + } + }, + null, + function() { completeCalled = true; } + ); + + expect(times).toBe(2); + expect(completeCalled).toBe(false); + }); + describe('when called with an anonymous observer', () => { it('should accept an anonymous observer with just a next function and call the next function in the context' + ' of the anonymous observer', (done: DoneSignature) => { @@ -299,7 +369,7 @@ describe('Observable', () => { }).not.toThrow(); }); - it('should not run unsubscription logic when an error is thrown sending messages synchronously to an' + + it('should run unsubscription logic when an error is thrown sending messages synchronously to an' + ' anonymous observer', () => { let messageError = false; let messageErrorValue = false; @@ -333,6 +403,75 @@ describe('Observable', () => { expect(messageError).toBe(true); expect(messageErrorValue).toBe('boo!'); }); + + it('should ignore next messages after unsubscription', () => { + let times = 0; + + new Observable((observer: Rx.Observer) => { + observer.next(0); + observer.next(0); + observer.next(0); + observer.next(0); + }) + .do(() => times += 1) + .subscribe({ + next() { + if (times === 2) { + this.unsubscribe(); + } + } + }); + + expect(times).toBe(2); + }); + + it('should ignore error messages after unsubscription', () => { + let times = 0; + let errorCalled = false; + + new Observable((observer: Rx.Observer) => { + observer.next(0); + observer.next(0); + observer.next(0); + observer.error(0); + }) + .do(() => times += 1) + .subscribe({ + next() { + if (times === 2) { + this.unsubscribe(); + } + }, + error() { errorCalled = true; } + }); + + expect(times).toBe(2); + expect(errorCalled).toBe(false); + }); + + it('should ignore complete messages after unsubscription', () => { + let times = 0; + let completeCalled = false; + + new Observable((observer: Rx.Observer) => { + observer.next(0); + observer.next(0); + observer.next(0); + observer.complete(); + }) + .do(() => times += 1) + .subscribe({ + next() { + if (times === 2) { + this.unsubscribe(); + } + }, + complete() { completeCalled = true; } + }); + + expect(times).toBe(2); + expect(completeCalled).toBe(false); + }); }); }); }); diff --git a/spec/Subject-spec.ts b/spec/Subject-spec.ts index 937a924c66..d4e4bff135 100644 --- a/spec/Subject-spec.ts +++ b/spec/Subject-spec.ts @@ -356,11 +356,12 @@ describe('Subject', () => { subject.subscribe(function (x) { expect(x).toBe(expected.shift()); - }, null, done); + }); subject.next('foo'); subject.complete(); - subject.next('bar'); + expect(() => subject.next('bar')).toThrow(new Rx.ObjectUnsubscribedError()); + done(); }); it('should clean out unsubscribed subscribers', (done: DoneSignature) => { diff --git a/spec/Subscriber-spec.ts b/spec/Subscriber-spec.ts index bccd33462a..b1163683bf 100644 --- a/spec/Subscriber-spec.ts +++ b/spec/Subscriber-spec.ts @@ -32,4 +32,57 @@ describe('Subscriber', () => { expect(completeSpy).not.toHaveBeenCalled(); }); }); + + it('should ignore next messages after unsubscription', () => { + let times = 0; + + const sub = new Subscriber({ + next() { times += 1; } + }); + + sub.next(); + sub.next(); + sub.unsubscribe(); + sub.next(); + + expect(times).toBe(2); + }); + + it('should ignore error messages after unsubscription', () => { + let times = 0; + let errorCalled = false; + + const sub = new Subscriber({ + next() { times += 1; }, + error() { errorCalled = true; } + }); + + sub.next(); + sub.next(); + sub.unsubscribe(); + sub.next(); + sub.error(); + + expect(times).toBe(2); + expect(errorCalled).toBe(false); + }); + + it('should ignore complete messages after unsubscription', () => { + let times = 0; + let completeCalled = false; + + const sub = new Subscriber({ + next() { times += 1; }, + complete() { completeCalled = true; } + }); + + sub.next(); + sub.next(); + sub.unsubscribe(); + sub.next(); + sub.complete(); + + expect(times).toBe(2); + expect(completeCalled).toBe(false); + }); }); diff --git a/src/Observable.ts b/src/Observable.ts index afc469625b..bbe67adf69 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -87,22 +87,27 @@ export class Observable implements Subscribable { complete?: () => void): Subscription { const { operator } = this; - const subscriber = toSubscriber(observerOrNext, error, complete); + const target = toSubscriber(observerOrNext, error, complete); + const transformer = operator && operator.call(target) || target; - if (operator) { - subscriber.add(this._subscribe(operator.call(subscriber))); - } else { - subscriber.add(this._subscribe(subscriber)); + if (transformer !== target) { + target.add(transformer); } - if (subscriber.syncErrorThrowable) { - subscriber.syncErrorThrowable = false; - if (subscriber.syncErrorThrown) { - throw subscriber.syncErrorValue; + const subscription = this._subscribe(transformer); + + if (subscription !== target) { + target.add(subscription); + } + + if (target.syncErrorThrowable) { + target.syncErrorThrowable = false; + if (target.syncErrorThrown) { + throw target.syncErrorValue; } } - return subscriber; + return target; } /** diff --git a/src/Subscriber.ts b/src/Subscriber.ts index 76b6068d26..51c95fc8ae 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -118,6 +118,10 @@ class SafeSubscriber extends Subscriber { next = (> observerOrNext).next; error = (> observerOrNext).error; complete = (> observerOrNext).complete; + if (isFunction(context.unsubscribe)) { + this.add(<() => void> context.unsubscribe.bind(context)); + } + context.unsubscribe = this.unsubscribe.bind(this); } this._context = context;