Skip to content

Commit

Permalink
refactor(Observable): Update property and method types
Browse files Browse the repository at this point in the history
- Removes `_isScalar` as it was unused
- makes `lift` `protected`. This is an internal implementation detail.
- makes `source` `protected`, this is an internal implementation detail.
- Refactors operators to use new utility functions that do the lift or provide a reasonable error if the observable someone is trying to use with the operator does not have a lift method. Adds documentation.

BREAKING CHANGE: `lift` no longer exposed. It was _NEVER_ documented that end users of the library should be creating operators using `lift`. Lift has a [variety of issues](ReactiveX#5431) and was always an internal implementation detail of rxjs that might have been used by a few power users in the early days when it had the most value. The value of `lift`, originally, was that subclassed `Observable`s would compose through all operators that implemented lift. The reality is that feature is not widely known, used, or supported, and it was never documented as it was very experimental when it was first added. Until the end of v7, `lift` will remain on Observable. Standard JavaScript users will notice no difference. However, TypeScript users might see complaints about `lift` not being a member of observable. To workaround this issue there are two things you can do: 1. Rewrite your operators as [outlined in the documentation](https://rxjs.dev/guide/operators), such that they return `new Observable`. or 2. cast your observable as `any` and access `lift` that way. Method 1 is recommended if you do not want things to break when we move to version 8.
  • Loading branch information
benlesh committed Jul 4, 2020
1 parent f1d5972 commit 2a9f89c
Show file tree
Hide file tree
Showing 84 changed files with 264 additions and 142 deletions.
6 changes: 3 additions & 3 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ describe('Observable.lift', () => {
}
};

NEVER.lift(myOperator)
(NEVER as any).lift(myOperator)
.subscribe()
.unsubscribe();

Expand Down Expand Up @@ -883,8 +883,8 @@ describe('Observable.lift', () => {
class LogObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
const observable = new LogObservable<R>();
(<any>observable).source = this;
(<any>observable).operator = new LogOperator(operator);
observable.source = this;
observable.operator = new LogOperator(operator);
return observable;
}
}
Expand Down
2 changes: 1 addition & 1 deletion spec/helpers/interop-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export function asInteropObservable<T>(observable: Observable<T>): Observable<T>
return new Proxy(observable, {
get(target: Observable<T>, key: string | number | symbol) {
if (key === 'lift') {
const { lift } = target;
const { lift } = target as any;
return interopLift(lift);
}
if (key === 'subscribe') {
Expand Down
37 changes: 11 additions & 26 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import { Subscription } from './Subscription';
import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types';
import { canReportError } from './util/canReportError';
import { toSubscriber } from './util/toSubscriber';
import { iif } from './observable/iif';
import { throwError } from './observable/throwError';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
Expand All @@ -20,14 +18,11 @@ import { config } from './config';
* @class Observable<T>
*/
export class Observable<T> implements Subscribable<T> {
/** Internal implementation detail, do not use directly. */
public _isScalar: boolean = false;

/** @deprecated This is an internal implementation detail, do not use. */
source: Observable<any> | undefined;
protected source: Observable<any> | undefined;

/** @deprecated This is an internal implementation detail, do not use. */
operator: Operator<any, T> | undefined;
protected operator: Operator<any, T> | undefined;

/**
* @constructor
Expand Down Expand Up @@ -59,13 +54,16 @@ export class Observable<T> implements Subscribable<T> {
};

/**
* Creates a new Observable, with this Observable as the source, and the passed
* Creates a new Observable, with this Observable instance as the source, and the passed
* operator defined as the new observable's operator.
* @method lift
* @param {Operator} operator the operator defining the operation to take on the observable
* @return {Observable} a new observable with the Operator applied
* @param operator the operator defining the operation to take on the observable
* @return a new observable with the Operator applied
* @deprecated This is an internal implementation detail, do not use directly. If you have implemented an operator
* using `lift`, it is recommended that you create an operator by simply returning `new Observable()` directly.
* See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators
*/
lift<R>(operator?: Operator<T, R>): Observable<R> {
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
Expand Down Expand Up @@ -236,7 +234,7 @@ export class Observable<T> implements Subscribable<T> {
}

/** @deprecated This is an internal implementation detail, do not use. */
_trySubscribe(sink: Subscriber<T>): TeardownLogic {
protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
Expand Down Expand Up @@ -336,24 +334,11 @@ export class Observable<T> implements Subscribable<T> {
}

/** @internal This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<any>): TeardownLogic {
protected _subscribe(subscriber: Subscriber<any>): TeardownLogic {
const { source } = this;
return source && source.subscribe(subscriber);
}

// `if` and `throw` are special snow flakes, the compiler sees them as reserved words. Deprecated in
// favor of iif and throwError functions.
/**
* @nocollapse
* @deprecated In favor of iif creation function: import { iif } from 'rxjs';
*/
static if: typeof iif;
/**
* @nocollapse
* @deprecated In favor of throwError creation function: import { throwError } from 'rxjs';
*/
static throw: typeof throwError;

/**
* An interop point defined by the es7-observable spec https://github.com/zenparsing/es-observable
* @method Symbol.observable
Expand Down
3 changes: 3 additions & 0 deletions src/internal/Operator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Subscriber } from './Subscriber';
import { TeardownLogic } from './types';

/***
* @deprecated Internal implementation detail, do not use.
*/
export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
3 changes: 2 additions & 1 deletion src/internal/observable/combineLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Operator } from '../Operator';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { fromArray } from './fromArray';
import { lift } from '../util/lift';

const NONE = {};

Expand Down Expand Up @@ -233,7 +234,7 @@ export function combineLatest<O extends ObservableInput<any>, R>(
observables = observables[0] as any;
}

return fromArray(observables, scheduler).lift(new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
return lift(fromArray(observables, scheduler), new CombineLatestOperator<ObservedValueOf<O>, R>(resultSelector));
}

export class CombineLatestOperator<T, R> implements Operator<T, R> {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/observable/race.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { TeardownLogic, ObservableInput, ObservedValueUnionFromArray } from '../
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { lift } from '../util/lift';

export function race<A extends ObservableInput<any>[]>(observables: A): Observable<ObservedValueUnionFromArray<A>>;
export function race<A extends ObservableInput<any>[]>(...observables: A): Observable<ObservedValueUnionFromArray<A>>;
Expand Down Expand Up @@ -65,7 +66,7 @@ export function race<T>(...observables: (ObservableInput<T> | ObservableInput<T>
}
}

return fromArray(observables, undefined).lift(new RaceOperator<T>());
return lift(fromArray(observables, undefined), new RaceOperator<T>());
}

export class RaceOperator<T> implements Operator<T, T> {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/observable/zip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { iterator as Symbol_iterator } from '../../internal/symbol/iterator';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
/** @deprecated resultSelector is no longer supported, pipe to map instead */
Expand Down Expand Up @@ -85,7 +86,7 @@ export function zip<O extends ObservableInput<any>, R>(
if (typeof last === 'function') {
resultSelector = observables.pop() as typeof resultSelector;
}
return fromArray(observables, undefined).lift(new ZipOperator(resultSelector));
return lift(fromArray(observables, undefined), new ZipOperator(resultSelector));
}

export class ZipOperator<T, R> implements Operator<T, R> {
Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { MonoTypeOperatorFunction, SubscribableOrPromise, TeardownLogic } from '

import { OuterSubscriber } from '../OuterSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { lift } from '../util/lift';

/**
* Ignores source values for a duration determined by another Observable, then
Expand Down Expand Up @@ -54,7 +55,7 @@ import { subscribeToResult } from '../util/subscribeToResult';
*/
export function audit<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
return function auditOperatorFunction(source: Observable<T>) {
return source.lift(new AuditOperator(durationSelector));
return lift(source, new AuditOperator(durationSelector));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { OperatorFunction } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values until `closingNotifier` emits.
Expand Down Expand Up @@ -47,7 +48,7 @@ import { OperatorFunction } from '../types';
*/
export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
return function bufferOperatorFunction(source: Observable<T>) {
return source.lift(new BufferOperator<T>(closingNotifier));
return lift(source, new BufferOperator<T>(closingNotifier));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { OperatorFunction, TeardownLogic } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values until the size hits the maximum
Expand Down Expand Up @@ -59,7 +60,7 @@ import { OperatorFunction, TeardownLogic } from '../types';
*/
export function bufferCount<T>(bufferSize: number, startBufferEvery: number | null = null): OperatorFunction<T, T[]> {
return function bufferCountOperatorFunction(source: Observable<T>) {
return source.lift(new BufferCountOperator<T>(bufferSize, startBufferEvery));
return lift(source, new BufferCountOperator<T>(bufferSize, startBufferEvery));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber';
import { Subscription } from '../Subscription';
import { isScheduler } from '../util/isScheduler';
import { OperatorFunction, SchedulerAction, SchedulerLike } from '../types';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
export function bufferTime<T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>;
Expand Down Expand Up @@ -88,7 +89,7 @@ export function bufferTime<T>(bufferTimeSpan: number): OperatorFunction<T, T[]>
}

return function bufferTimeOperatorFunction(source: Observable<T>) {
return source.lift(new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
return lift(source, new BufferTimeOperator<T>(bufferTimeSpan, bufferCreationInterval, maxBufferSize, scheduler));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { subscribeToResult } from '../util/subscribeToResult';
import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { OperatorFunction, SubscribableOrPromise } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values starting from an emission from
Expand Down Expand Up @@ -57,7 +58,7 @@ export function bufferToggle<T, O>(
closingSelector: (value: O) => SubscribableOrPromise<any>
): OperatorFunction<T, T[]> {
return function bufferToggleOperatorFunction(source: Observable<T>) {
return source.lift(new BufferToggleOperator<T, O>(openings, closingSelector));
return lift(source, new BufferToggleOperator<T, O>(openings, closingSelector));
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/bufferWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { OperatorFunction } from '../types';
import { lift } from '../util/lift';

/**
* Buffers the source Observable values, using a factory function of closing
Expand Down Expand Up @@ -50,7 +51,7 @@ import { OperatorFunction } from '../types';
*/
export function bufferWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, T[]> {
return function (source: Observable<T>) {
return source.lift(new BufferWhenOperator(closingSelector));
return lift(source, new BufferWhenOperator(closingSelector));
};
}

Expand Down
6 changes: 4 additions & 2 deletions src/internal/operators/catchError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { OuterSubscriber } from '../OuterSubscriber';
import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { ObservableInput, OperatorFunction, ObservedValueOf } from '../types';
import { lift } from '../util/lift';

/* tslint:disable:max-line-length */
export function catchError<T, O extends ObservableInput<any>>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
Expand Down Expand Up @@ -108,8 +109,9 @@ export function catchError<T, O extends ObservableInput<any>>(
): OperatorFunction<T, T | ObservedValueOf<O>> {
return function catchErrorOperatorFunction(source: Observable<T>): Observable<T | ObservedValueOf<O>> {
const operator = new CatchOperator(selector);
const caught = source.lift(operator);
return (operator.caught = caught as Observable<T>);
const caught = lift(source, operator);
operator.caught = caught;
return caught;
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/internal/operators/combineAll.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { CombineLatestOperator } from '../observable/combineLatest';
import { Observable } from '../Observable';
import { OperatorFunction, ObservableInput } from '../types';
import { lift } from '../util/lift';

export function combineAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
export function combineAll<T>(): OperatorFunction<any, T[]>;
Expand Down Expand Up @@ -53,5 +54,5 @@ export function combineAll<R>(project: (...values: Array<any>) => R): OperatorFu
* @name combineAll
*/
export function combineAll<T, R>(project?: (...values: Array<any>) => R): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift(new CombineLatestOperator(project));
return (source: Observable<T>) => lift(source, new CombineLatestOperator(project));
}
6 changes: 4 additions & 2 deletions src/internal/operators/combineLatestWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { CombineLatestOperator } from '../observable/combineLatest';
import { from } from '../observable/from';
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types';
import { lift, stankyLift } from '../util/lift';

/* tslint:disable:max-line-length */
/** @deprecated use {@link combineLatestWith} */
Expand Down Expand Up @@ -53,10 +54,11 @@ export function combineLatest<T, R>(...observables: Array<ObservableInput<any> |
observables = (<any>observables[0]).slice();
}

return (source: Observable<T>) => source.lift.call(
return (source: Observable<T>) => stankyLift(
source,
from([source, ...observables]),
new CombineLatestOperator(project)
) as Observable<R>;
);
}

/**
Expand Down
7 changes: 4 additions & 3 deletions src/internal/operators/concat.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { concat as concatStatic } from '../observable/concat';
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { stankyLift } from '../util/lift';

/* tslint:disable:max-line-length */
/** @deprecated remove in v8. Use {@link concatWith} */
Expand All @@ -25,8 +26,8 @@ export function concat<T, R>(...observables: Array<ObservableInput<any> | Schedu
* @deprecated remove in v8. Use {@link concatWith}
*/
export function concat<T, R>(...observables: Array<ObservableInput<any> | SchedulerLike | undefined>): OperatorFunction<T, R> {
return (source: Observable<T>) => source.lift.call(
return (source: Observable<T>) => stankyLift(
source,
concatStatic(source, ...(observables as any[])),
undefined
) as Observable<R>;
);
}
9 changes: 5 additions & 4 deletions src/internal/operators/concatWith.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { concat as concatStatic } from '../observable/concat';
import { Observable } from '../Observable';
import { ObservableInput, OperatorFunction, ObservedValueUnionFromArray } from '../types';
import { stankyLift } from '../util/lift';

export function concatWith<T>(): OperatorFunction<T, T>;
export function concatWith<T, A extends ObservableInput<any>[]>(...otherSources: A): OperatorFunction<T, ObservedValueUnionFromArray<A> | T>;
Expand Down Expand Up @@ -44,8 +45,8 @@ export function concatWith<T, A extends ObservableInput<any>[]>(...otherSources:
* @param otherSources Other observable sources to subscribe to, in sequence, after the original source is complete.
*/
export function concatWith<T, A extends ObservableInput<any>[]>(...otherSources: A): OperatorFunction<T, ObservedValueUnionFromArray<A> | T> {
return (source: Observable<T>) => source.lift.call(
concatStatic(source, ...otherSources),
undefined
) as Observable<ObservedValueUnionFromArray<A> | T>;
return (source: Observable<T>) => stankyLift(
source,
concatStatic(source, ...otherSources)
);
}
3 changes: 2 additions & 1 deletion src/internal/operators/count.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Observer, OperatorFunction } from '../types';
import { Subscriber } from '../Subscriber';
import { lift } from '../util/lift';
/**
* Counts the number of emissions on the source and emits that number when the
* source completes.
Expand Down Expand Up @@ -62,7 +63,7 @@ import { Subscriber } from '../Subscriber';
*/

export function count<T>(predicate?: (value: T, index: number, source: Observable<T>) => boolean): OperatorFunction<T, number> {
return (source: Observable<T>) => source.lift(new CountOperator(predicate, source));
return (source: Observable<T>) => lift(source, new CountOperator(predicate, source));
}

class CountOperator<T> implements Operator<T, number> {
Expand Down
Loading

0 comments on commit 2a9f89c

Please sign in to comment.