Skip to content

Commit

Permalink
fix(observable): ensure the subscriber chain is complete before calli…
Browse files Browse the repository at this point in the history
…ng this._subscribe
  • Loading branch information
trxcllnt authored and benlesh committed Mar 25, 2016
1 parent e66b2d8 commit 1631224
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 13 deletions.
141 changes: 140 additions & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,76 @@ describe('Observable', () => {
expect(messageErrorValue).toBe('boo!');
});

it('should ignore next messages after unsubscription', () => {
let times = 0;

new Observable((observer: Rx.Observer<number>) => {
observer.next(0);
observer.next(0);
observer.next(0);
observer.next(0);
})
.do(() => times += 1)
.subscribe(
function() {
if (times === 2) {
this.unsubscribe();
}
}
);

expect(times).toBe(2);
});

it('should ignore error messages after unsubscription', () => {
let times = 0;
let errorCalled = false;

new Observable((observer: Rx.Observer<number>) => {
observer.next(0);
observer.next(0);
observer.next(0);
observer.error(0);
})
.do(() => times += 1)
.subscribe(
function() {
if (times === 2) {
this.unsubscribe();
}
},
function() { errorCalled = true; }
);

expect(times).toBe(2);
expect(errorCalled).toBe(false);
});

it('should ignore complete messages after unsubscription', () => {
let times = 0;
let completeCalled = false;

new Observable((observer: Rx.Observer<number>) => {
observer.next(0);
observer.next(0);
observer.next(0);
observer.complete();
})
.do(() => times += 1)
.subscribe(
function() {
if (times === 2) {
this.unsubscribe();
}
},
null,
function() { completeCalled = true; }
);

expect(times).toBe(2);
expect(completeCalled).toBe(false);
});

describe('when called with an anonymous observer', () => {
it('should accept an anonymous observer with just a next function and call the next function in the context' +
' of the anonymous observer', (done: DoneSignature) => {
Expand Down Expand Up @@ -299,7 +369,7 @@ describe('Observable', () => {
}).not.toThrow();
});

it('should not run unsubscription logic when an error is thrown sending messages synchronously to an' +
it('should run unsubscription logic when an error is thrown sending messages synchronously to an' +
' anonymous observer', () => {
let messageError = false;
let messageErrorValue = false;
Expand Down Expand Up @@ -333,6 +403,75 @@ describe('Observable', () => {
expect(messageError).toBe(true);
expect(messageErrorValue).toBe('boo!');
});

it('should ignore next messages after unsubscription', () => {
let times = 0;

new Observable((observer: Rx.Observer<number>) => {
observer.next(0);
observer.next(0);
observer.next(0);
observer.next(0);
})
.do(() => times += 1)
.subscribe({
next() {
if (times === 2) {
this.unsubscribe();
}
}
});

expect(times).toBe(2);
});

it('should ignore error messages after unsubscription', () => {
let times = 0;
let errorCalled = false;

new Observable((observer: Rx.Observer<number>) => {
observer.next(0);
observer.next(0);
observer.next(0);
observer.error(0);
})
.do(() => times += 1)
.subscribe({
next() {
if (times === 2) {
this.unsubscribe();
}
},
error() { errorCalled = true; }
});

expect(times).toBe(2);
expect(errorCalled).toBe(false);
});

it('should ignore complete messages after unsubscription', () => {
let times = 0;
let completeCalled = false;

new Observable((observer: Rx.Observer<number>) => {
observer.next(0);
observer.next(0);
observer.next(0);
observer.complete();
})
.do(() => times += 1)
.subscribe({
next() {
if (times === 2) {
this.unsubscribe();
}
},
complete() { completeCalled = true; }
});

expect(times).toBe(2);
expect(completeCalled).toBe(false);
});
});
});
});
Expand Down
5 changes: 3 additions & 2 deletions spec/Subject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,12 @@ describe('Subject', () => {

subject.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, done);
});

subject.next('foo');
subject.complete();
subject.next('bar');
expect(() => subject.next('bar')).toThrow(new Rx.ObjectUnsubscribedError());
done();
});

it('should clean out unsubscribed subscribers', (done: DoneSignature) => {
Expand Down
53 changes: 53 additions & 0 deletions spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,57 @@ describe('Subscriber', () => {
expect(completeSpy).not.toHaveBeenCalled();
});
});

it('should ignore next messages after unsubscription', () => {
let times = 0;

const sub = new Subscriber({
next() { times += 1; }
});

sub.next();
sub.next();
sub.unsubscribe();
sub.next();

expect(times).toBe(2);
});

it('should ignore error messages after unsubscription', () => {
let times = 0;
let errorCalled = false;

const sub = new Subscriber({
next() { times += 1; },
error() { errorCalled = true; }
});

sub.next();
sub.next();
sub.unsubscribe();
sub.next();
sub.error();

expect(times).toBe(2);
expect(errorCalled).toBe(false);
});

it('should ignore complete messages after unsubscription', () => {
let times = 0;
let completeCalled = false;

const sub = new Subscriber({
next() { times += 1; },
complete() { completeCalled = true; }
});

sub.next();
sub.next();
sub.unsubscribe();
sub.next();
sub.complete();

expect(times).toBe(2);
expect(completeCalled).toBe(false);
});
});
25 changes: 15 additions & 10 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,27 @@ export class Observable<T> implements Subscribable<T> {
complete?: () => void): Subscription {

const { operator } = this;
const subscriber = toSubscriber(observerOrNext, error, complete);
const target = toSubscriber(observerOrNext, error, complete);
const transformer = operator && operator.call(target) || target;

if (operator) {
subscriber.add(this._subscribe(operator.call(subscriber)));
} else {
subscriber.add(this._subscribe(subscriber));
if (transformer !== target) {
target.add(transformer);
}

if (subscriber.syncErrorThrowable) {
subscriber.syncErrorThrowable = false;
if (subscriber.syncErrorThrown) {
throw subscriber.syncErrorValue;
const subscription = this._subscribe(transformer);

if (subscription !== target) {
target.add(subscription);
}

if (target.syncErrorThrowable) {
target.syncErrorThrowable = false;
if (target.syncErrorThrown) {
throw target.syncErrorValue;
}
}

return subscriber;
return target;
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ class SafeSubscriber<T> extends Subscriber<T> {
next = (<PartialObserver<T>> observerOrNext).next;
error = (<PartialObserver<T>> observerOrNext).error;
complete = (<PartialObserver<T>> observerOrNext).complete;
if (isFunction(context.unsubscribe)) {
this.add(<() => void> context.unsubscribe.bind(context));
}
context.unsubscribe = this.unsubscribe.bind(this);
}

this._context = context;
Expand Down

0 comments on commit 1631224

Please sign in to comment.