Skip to content

Commit

Permalink
fix(catchError): inner synchronous observables will properly terminate (
Browse files Browse the repository at this point in the history
#5655)

* fix(catchError): inner synchronous observables will properly terminate when composed with take, first, et al

* refactor: remove cruft ready property
  • Loading branch information
benlesh authored Aug 19, 2020
1 parent a468f88 commit d3fd2fb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
2 changes: 1 addition & 1 deletion spec/operators/catchError-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ describe('catchError operator', () => {
});

// TODO: fix firehose unsubscription
it.skip('should stop listening to a synchronous observable when unsubscribed', () => {
it('should stop listening to a synchronous observable when unsubscribed', () => {
const sideEffects: number[] = [];
const synchronousObservable = new Observable(subscriber => {
// This will check to see if the subscriber was closed on each loop
Expand Down
26 changes: 20 additions & 6 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,8 @@ export function catchError<T, O extends ObservableInput<any>>(
}
};

innerSub = source.subscribe({
next: (value) => subscriber.next(value),
error: (err) => {
innerSub = source.subscribe(
new CatchErrorSubscriber(subscriber, (err) => {
handleError(err);
if (handledResult) {
if (innerSub) {
Expand All @@ -138,9 +137,8 @@ export function catchError<T, O extends ObservableInput<any>>(
syncUnsub = true;
}
}
},
complete: () => subscriber.complete(),
});
})
);

if (syncUnsub) {
innerSub.unsubscribe();
Expand All @@ -153,3 +151,19 @@ export function catchError<T, O extends ObservableInput<any>>(
return subscription;
});
}

/**
* This must exist to ensure that the `closed` state of the inner subscriber is set at
* the proper time to ensure operators like `take` can stop the inner subscription if
* it is a synchronous firehose.
*/
class CatchErrorSubscriber<T, C> extends Subscriber<T> {
constructor(destination: Subscriber<T | C>, private onError: (err: any) => void) {
super(destination);
}

_error(err: any) {
this.onError(err);
this.unsubscribe();
}
}

0 comments on commit d3fd2fb

Please sign in to comment.