Skip to content

Commit

Permalink
refactor(SafeSubscriber): smaller implementation
Browse files Browse the repository at this point in the history
- Also removes unnecessary check to see if the observer was a Subscriber. Subscribers are never passed to SafeSubscriber, so it was unnecessary.
  • Loading branch information
benlesh committed Sep 23, 2020
1 parent a0ea0f7 commit 1b07686
Showing 1 changed file with 20 additions and 60 deletions.
80 changes: 20 additions & 60 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import { isFunction } from './util/isFunction';
import { EMPTY_OBSERVER } from './EMPTY_OBSERVER';
import { Observer, PartialObserver } from './types';
import { Subscription, isSubscription } from './Subscription';
import { Subscription } from './Subscription';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
import { noop } from './util/noop';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -155,12 +156,13 @@ export class SafeSubscriber<T> extends Subscriber<T> {
if (isFunction(observerOrNext)) {
next = observerOrNext;
} else if (observerOrNext) {
next = observerOrNext.next;
error = observerOrNext.error;
complete = observerOrNext.complete;
({ next, error, complete } = observerOrNext);
if (observerOrNext !== EMPTY_OBSERVER) {
let context: any;
if (config.useDeprecatedNextContext) {
if (this && config.useDeprecatedNextContext) {
// This is a deprecated path that made `this.unsubscribe()` available in
// next handler functions passed to subscribe. This only exists behind a flag
// now, as it is *very* slow.
context = Object.create(observerOrNext);
context.unsubscribe = this.unsubscribe.bind(this);
} else {
Expand All @@ -169,64 +171,14 @@ export class SafeSubscriber<T> extends Subscriber<T> {
next = next?.bind(context);
error = error?.bind(context);
complete = complete?.bind(context);
if (isSubscription(observerOrNext)) {
observerOrNext.add(this.unsubscribe.bind(this));
}
}
}

this._next = next!;
this._error = error!;
this._complete = complete!;
}

next(value: T): void {
if (!this.isStopped && this._next) {
try {
this._next(value);
} catch (err) {
this._throw(err);
}
}
}

error(err: any): void {
if (!this.isStopped) {
if (this._error) {
try {
this._error(err);
} catch (err) {
this._throw(err);
return;
}
this.unsubscribe();
} else {
this._throw(err);
}
}
}

private _throw(err: any) {
this.unsubscribe();
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
reportUnhandledError(err);
}
}

complete(): void {
if (!this.isStopped) {
if (this._complete) {
try {
this._complete();
} catch (err) {
this._throw(err);
return;
}
}
this.unsubscribe();
}
this.destination = {
next: next || noop,
error: error || handleError,
complete: complete || noop,
};
}

unsubscribe() {
Expand All @@ -238,3 +190,11 @@ export class SafeSubscriber<T> extends Subscriber<T> {
}
}
}

function handleError(err: any) {
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
reportUnhandledError(err);
}
}

0 comments on commit 1b07686

Please sign in to comment.