Skip to content

Commit

Permalink
refactor(Subscriber): Massively untangle Subscriber and SafeSubscriber
Browse files Browse the repository at this point in the history
1. `Subscriber` is just a `Subscriber` now. It doesn't do anything magical to wrap the destination it is handed in a `SafeSubscriber`.
2. `Subscriber.create` returns `SafeSubscriber`.
3. Simplifies logic in `Subscriber` significantly.
4. Adds comments outlining code to be removed when the deprecated next context code is removed.
5. Completely removes `toSubscriber` and related complexity.

BREAKING CHANGE: `new Subscriber` no longer takes 0-3 arguments. To create a `Subscriber` with 0-3 arguments, use `Subscriber.create`. However, please note that there is little to no reason that you should be creating `Subscriber` references directly, and `Subscriber.create` and `new Subscriber` are both deprecated.
  • Loading branch information
benlesh committed Sep 23, 2020
1 parent 1222d5a commit 07902ca
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 167 deletions.
36 changes: 13 additions & 23 deletions spec/Subscriber-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import { asInteropSubscriber } from './helpers/interop-helper';
import { getRegisteredTeardowns } from './helpers/subscription';

/** @test {Subscriber} */
describe('Subscriber', () => {
describe('SafeSubscriber', () => {
it('should ignore next messages after unsubscription', () => {
let times = 0;

const sub = new Subscriber({
const sub = new SafeSubscriber({
next() { times += 1; }
});

Expand All @@ -21,23 +21,11 @@ describe('Subscriber', () => {
expect(times).to.equal(2);
});

it('should wrap unsafe observers in a safe subscriber', () => {
const observer = {
next(x: any) { /* noop */ },
error(err: any) { /* noop */ },
complete() { /* noop */ }
};

const subscriber = new Subscriber(observer);
expect((subscriber as any).destination).not.to.equal(observer);
expect((subscriber as any).destination).to.be.an.instanceof(SafeSubscriber);
});

it('should ignore error messages after unsubscription', () => {
let times = 0;
let errorCalled = false;

const sub = new Subscriber({
const sub = new SafeSubscriber({
next() { times += 1; },
error() { errorCalled = true; }
});
Expand All @@ -56,7 +44,7 @@ describe('Subscriber', () => {
let times = 0;
let completeCalled = false;

const sub = new Subscriber({
const sub = new SafeSubscriber({
next() { times += 1; },
complete() { completeCalled = true; }
});
Expand All @@ -76,8 +64,8 @@ describe('Subscriber', () => {
next: function () { /*noop*/ }
};

const sub1 = new Subscriber(observer);
const sub2 = new Subscriber(observer);
const sub1 = new SafeSubscriber(observer);
const sub2 = new SafeSubscriber(observer);

sub2.complete();

Expand All @@ -94,7 +82,7 @@ describe('Subscriber', () => {
}
};

const sub1 = new Subscriber(observer);
const sub1 = new SafeSubscriber(observer);
sub1.complete();

expect(argument).to.have.lengthOf(0);
Expand All @@ -105,7 +93,7 @@ describe('Subscriber', () => {
let subscriberUnsubscribed = false;
let subscriptionUnsubscribed = false;

const subscriber = new Subscriber<void>();
const subscriber = new SafeSubscriber<void>();
subscriber.add(() => subscriberUnsubscribed = true);

const source = new Observable<void>(() => () => observableUnsubscribed = true);
Expand All @@ -120,7 +108,7 @@ describe('Subscriber', () => {

it('should have idempotent unsubscription', () => {
let count = 0;
const subscriber = new Subscriber();
const subscriber = new SafeSubscriber();
subscriber.add(() => ++count);
expect(count).to.equal(0);

Expand All @@ -133,7 +121,7 @@ describe('Subscriber', () => {

it('should close, unsubscribe, and unregister all teardowns after complete', () => {
let isUnsubscribed = false;
const subscriber = new Subscriber();
const subscriber = new SafeSubscriber();
subscriber.add(() => isUnsubscribed = true);
subscriber.complete();
expect(isUnsubscribed).to.be.true;
Expand All @@ -143,7 +131,7 @@ describe('Subscriber', () => {

it('should close, unsubscribe, and unregister all teardowns after error', () => {
let isTornDown = false;
const subscriber = new Subscriber({
const subscriber = new SafeSubscriber({
error: () => {
// Mischief managed!
// Adding this handler here to prevent the call to error from
Expand All @@ -156,7 +144,9 @@ describe('Subscriber', () => {
expect(subscriber.closed).to.be.true;
expect(getRegisteredTeardowns(subscriber).length).to.equal(0);
});
});

describe('Subscriber', () => {
it('should teardown and unregister all teardowns after complete', () => {
let isTornDown = false;
const subscriber = new Subscriber();
Expand Down
3 changes: 2 additions & 1 deletion spec/observables/generate-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { expect } from 'chai';
import { expectObservable } from '../helpers/marble-testing';
import { generate, Subscriber } from 'rxjs';
import { take } from 'rxjs/operators';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';

declare const rxTestScheduler: TestScheduler;

Expand Down Expand Up @@ -55,7 +56,7 @@ describe('generate', () => {
it('should stop producing when unsubscribed', () => {
const source = generate(1, x => x < 4, x => x + 1);
let count = 0;
const subscriber = new Subscriber<number>(
const subscriber = new SafeSubscriber<number>(
x => {
count++;
if (x == 2) {
Expand Down
5 changes: 3 additions & 2 deletions spec/operators/repeatWhen-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { repeatWhen, map, mergeMap, takeUntil, takeWhile, take } from 'rxjs/operators';
import { of, EMPTY, Observable, Subscriber } from 'rxjs';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';

/** @test {repeatWhen} */
describe('repeatWhen operator', () => {
Expand Down Expand Up @@ -92,7 +93,7 @@ describe('repeatWhen operator', () => {
Observable.prototype.subscribe = function (...args: any[]): any {
let [subscriber] = args;
if (!(subscriber instanceof Subscriber)) {
subscriber = new Subscriber<any>(...args);
subscriber = new SafeSubscriber(...args);
}
subscriber.error = function (err: any): void {
errors.push(err);
Expand All @@ -119,7 +120,7 @@ describe('repeatWhen operator', () => {
Observable.prototype.subscribe = function (...args: any[]): any {
let [subscriber] = args;
if (!(subscriber instanceof Subscriber)) {
subscriber = new Subscriber<any>(...args);
subscriber = new SafeSubscriber(...args);
}
subscriber.error = function (err: any): void {
errors.push(err);
Expand Down
9 changes: 5 additions & 4 deletions spec/util/canReportError-spec.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { expect } from 'chai';
import { noop, Subscriber } from 'rxjs';
import { canReportError } from 'rxjs/internal/Observable';
import { SafeSubscriber } from 'rxjs/internal/Subscriber';

describe('canReportError', () => {
it('should report errors to an observer if possible', () => {
const subscriber = new Subscriber<{}>(noop, noop);
const subscriber = new SafeSubscriber(noop, noop);
expect(canReportError(subscriber)).to.be.true;
});

it('should not report errors to a stopped observer', () => {
const subscriber = new Subscriber<{}>(noop, noop);
const subscriber = new SafeSubscriber(noop, noop);
subscriber.error(new Error('kaboom'));
expect(canReportError(subscriber)).to.be.false;
});

it('should not report errors an observer with a stopped destination', () => {
const destination = new Subscriber<{}>(noop, noop);
const subscriber = new Subscriber<{}>(destination);
const destination = new SafeSubscriber(noop, noop);
const subscriber = new Subscriber(destination);
destination.error(new Error('kaboom'));
expect(canReportError(subscriber)).to.be.false;
});
Expand Down
28 changes: 0 additions & 28 deletions spec/util/toSubscriber-spec.ts

This file was deleted.

17 changes: 12 additions & 5 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
* @prettier
*/
import { Operator } from './Operator';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types';
import { toSubscriber } from './util/toSubscriber';
import { SafeSubscriber, Subscriber } from './Subscriber';
import { isSubscription, Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable, Observer } from './types';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
Expand Down Expand Up @@ -208,7 +207,7 @@ export class Observable<T> implements Subscribable<T> {
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const subscriber = toSubscriber(observerOrNext, error, complete);
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);

// If we have an operator, it's the result of a lift, and we let the lift
// mechanism do the subscription for us in the operator call. Otherwise,
Expand Down Expand Up @@ -501,3 +500,11 @@ export function canReportError(subscriber: Subscriber<any>): boolean {
}
return true;
}

function isObserver<T>(value: any): value is Observer<T> {
return value && typeof value.next === 'function' && typeof value.error === 'function' && typeof value.complete === 'function';
}

function isSubscriber<T>(value: any): value is Subscriber<T> {
return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value));
}
71 changes: 20 additions & 51 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { isFunction } from './util/isFunction';
import { EMPTY_OBSERVER } from './EMPTY_OBSERVER';
import { Observer, PartialObserver } from './types';
import { Subscription } from './Subscription';
import { isSubscription, Subscription } from './Subscription';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
import { noop } from './util/noop';
Expand All @@ -21,58 +21,37 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
/**
* A static factory for a Subscriber, given a (potentially partial) definition
* of an Observer.
* @param {function(x: ?T): void} [next] The `next` callback of an Observer.
* @param {function(e: ?any): void} [error] The `error` callback of an
* @param next The `next` callback of an Observer.
* @param error The `error` callback of an
* Observer.
* @param {function(): void} [complete] The `complete` callback of an
* @param complete The `complete` callback of an
* Observer.
* @return {Subscriber<T>} A Subscriber wrapping the (partially defined)
* @return A Subscriber wrapping the (partially defined)
* Observer represented by the given arguments.
* @nocollapse
* @deprecated Do not use. Will be removed in v8. There is no replacement for this method, and there is no reason to be creating instances of `Subscriber` directly. If you have a specific use case, please file an issue.
*/
static create<T>(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber<T> {
return new Subscriber(next, error, complete);
return new SafeSubscriber(next, error, complete);
}

protected isStopped: boolean = false;
protected destination: Observer<any> | Subscriber<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)
protected destination: Subscriber<any> | Observer<any>; // this `any` is the escape hatch to erase extra type param (e.g. R)

/**
* @param {Observer|function(value: T): void} [destinationOrNext] A partially
* defined Observer or a `next` callback function.
* @param {function(e: ?any): void} [error] The `error` callback of an
* Observer.
* @param {function(): void} [complete] The `complete` callback of an
* Observer.
* @deprecated Do not use directly. There is no reason to directly create an instance of Subscriber. This type is exported for typings reasons.
*/
constructor(
destinationOrNext?: PartialObserver<any> | ((value: T) => void) | null,
error?: ((e?: any) => void) | null,
complete?: (() => void) | null
) {
constructor(destination?: Subscriber<any> | Observer<any>) {
super();

switch (arguments.length) {
case 0:
this.destination = EMPTY_OBSERVER;
break;
case 1:
if (!destinationOrNext) {
this.destination = EMPTY_OBSERVER;
break;
}
if (typeof destinationOrNext === 'object') {
if (destinationOrNext instanceof Subscriber) {
this.destination = destinationOrNext;
destinationOrNext.add(this);
} else {
this.destination = new SafeSubscriber<T>(this, <PartialObserver<any>>destinationOrNext);
}
break;
}
default:
this.destination = new SafeSubscriber<T>(this, <(value: T) => void>destinationOrNext, error, complete);
break;
if (destination) {
this.destination = destination;
// Automatically chain subscriptions together here.
// if destination is a Subscription, then it is a Subscriber.
if (isSubscription(destination)) {
destination.add(this);
}
} else {
this.destination = EMPTY_OBSERVER;
}
}

Expand Down Expand Up @@ -145,7 +124,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> {
*/
export class SafeSubscriber<T> extends Subscriber<T> {
constructor(
private _parentSubscriber: Subscriber<T>,
observerOrNext?: PartialObserver<T> | ((value: T) => void) | null,
error?: ((e?: any) => void) | null,
complete?: (() => void) | null
Expand All @@ -164,7 +142,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
// next handler functions passed to subscribe. This only exists behind a flag
// now, as it is *very* slow.
context = Object.create(observerOrNext);
context.unsubscribe = this.unsubscribe.bind(this);
context.unsubscribe = () => this.unsubscribe();
} else {
context = observerOrNext;
}
Expand All @@ -180,15 +158,6 @@ export class SafeSubscriber<T> extends Subscriber<T> {
complete: complete || noop,
};
}

unsubscribe() {
if (!this.closed) {
const { _parentSubscriber } = this;
this._parentSubscriber = null!;
_parentSubscriber.unsubscribe();
super.unsubscribe();
}
}
}

function handleError(err: any) {
Expand Down
Loading

0 comments on commit 07902ca

Please sign in to comment.