Skip to content

Commit

Permalink
fix(subscribeOn): allow Infinity as valid delay (#5500)
Browse files Browse the repository at this point in the history
* fix(subscribeOn): allow Infinity as valid delay

Removes use of `isNumeric` that excludes `Infinity` as a valid number
- Moves SubscribeOnObservable to the only location it is used.
- Removes redundant tests of internals

* chore: update test with proper subscription assertion
  • Loading branch information
benlesh authored Jun 16, 2020
1 parent ebdd383 commit cd7d649
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 120 deletions.
56 changes: 0 additions & 56 deletions spec/observables/SubscribeOnObservable-spec.ts

This file was deleted.

8 changes: 8 additions & 0 deletions spec/operators/subscribeOn-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,12 @@ describe('subscribeOn operator', () => {
expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(sub);
});

it('should properly support a delayTime of Infinity', () => {
const e1 = hot('--a--b--|');
const expected = '---------';

expectObservable(e1.pipe(subscribeOn(rxTestScheduler, Infinity))).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe([]);
});
});
52 changes: 0 additions & 52 deletions src/internal/observable/SubscribeOnObservable.ts

This file was deleted.

59 changes: 48 additions & 11 deletions src/internal/operators/subscribeOn.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,49 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import { SubscribeOnObservable } from '../observable/SubscribeOnObservable';
import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types';
import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic, SchedulerAction } from '../types';
import { asap as asapScheduler } from '../scheduler/asap';
import { Subscription } from '../Subscription';
import { isScheduler } from '../util/isScheduler';

export interface DispatchArg<T> {
source: Observable<T>;
subscriber: Subscriber<T>;
}

class SubscribeOnObservable<T> extends Observable<T> {
/** @nocollapse */
static dispatch<T>(this: SchedulerAction<T>, arg: DispatchArg<T>): Subscription {
const { source, subscriber } = arg;
return this.add(source.subscribe(subscriber));
}

constructor(
public source: Observable<T>,
private delayTime: number = 0,
private scheduler: SchedulerLike = asapScheduler
) {
super();
if (delayTime < 0) {
this.delayTime = 0;
}
if (!isScheduler(scheduler)) {
this.scheduler = asapScheduler;
}
}

/** @deprecated This is an internal implementation detail, do not use. */
_subscribe(subscriber: Subscriber<T>) {
const delay = this.delayTime;
const source = this.source;
const scheduler = this.scheduler;

return scheduler.schedule<DispatchArg<any>>(SubscribeOnObservable.dispatch as any, delay, {
source,
subscriber,
});
}
}

/**
* Asynchronously subscribes Observers to this Observable on the specified {@link SchedulerLike}.
Expand Down Expand Up @@ -59,9 +100,9 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types
* The reason for this is that Observable `b` emits its values directly and synchronously like before
* but the emissions from `a` are scheduled on the event loop because we are now using the {@link asyncScheduler} for that specific Observable.
*
* @param {SchedulerLike} scheduler - The {@link SchedulerLike} to perform subscription actions on.
* @return {Observable<T>} The source Observable modified so that its subscriptions happen on the specified {@link SchedulerLike}.
* @name subscribeOn
* @param scheduler The {@link SchedulerLike} to perform subscription actions on.
* @param delay A delay to pass to the scheduler to delay subscriptions
* @return The source Observable modified so that its subscriptions happen on the specified {@link SchedulerLike}.
*/
export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction<T> {
return function subscribeOnOperatorFunction(source: Observable<T>): Observable<T> {
Expand All @@ -70,12 +111,8 @@ export function subscribeOn<T>(scheduler: SchedulerLike, delay: number = 0): Mon
}

class SubscribeOnOperator<T> implements Operator<T, T> {
constructor(private scheduler: SchedulerLike,
private delay: number) {
}
constructor(private scheduler: SchedulerLike, private delay: number) {}
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return new SubscribeOnObservable<T>(
source, this.delay, this.scheduler
).subscribe(subscriber);
return new SubscribeOnObservable<T>(source, this.delay, this.scheduler).subscribe(subscriber);
}
}
2 changes: 1 addition & 1 deletion src/internal/util/isScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { SchedulerLike } from '../types';

export function isScheduler(value: any): value is SchedulerLike {
return value && typeof (<any>value).schedule === 'function';
return value && typeof value.schedule === 'function';
}

0 comments on commit cd7d649

Please sign in to comment.