Skip to content

Commit

Permalink
fix(delay): proper handling of absolute time passed as delay
Browse files Browse the repository at this point in the history
- Makes impl smaller
  • Loading branch information
benlesh committed Sep 14, 2020
1 parent 143ee68 commit 8ae89b1
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 115 deletions.
36 changes: 18 additions & 18 deletions spec/operators/delay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ describe('delay operator', () => {
});

it('should delay by absolute time period', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--b--| ');
const t = 3; // ---|
const expected = '-----a--(b|)';
const subs = ' ^-------! ';
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--b-------------c----d--| ');
const t = time(' -------|');
const expected = '-------(ab)--------c----d--|';
const subs = ' ^--------------------------! ';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));
Expand All @@ -42,11 +42,11 @@ describe('delay operator', () => {
});
});

it('should delay by absolute time period after subscription', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
it('should delay by absolute time period after complete', () => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^--a--b--| ');
const t = 3; // ---|
const expected = ' ------a--(b|)';
const t = time(' ------------|')
const expected = ' ------------(ab|)';
const subs = ' ^--------! ';

const absoluteDelay = new Date(testScheduler.now() + t);
Expand All @@ -72,10 +72,10 @@ describe('delay operator', () => {
});

it('should raise error when source raises error', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' --a--b--#');
const t = 3; // ---|
const expected = '-----a--#';
const t = time(' -----------|');
const expected = '--------#';
const subs = ' ^-------!';

const absoluteDelay = new Date(testScheduler.now() + t);
Expand All @@ -86,12 +86,12 @@ describe('delay operator', () => {
});
});

it('should raise error when source raises error after subscription', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^---a---b---#');
const t = 3; // ---|
const expected = ' -------a---b#';
const e1Sub = ' ^-----------!';
it('should raise error when source raises error after subscription when Date is passed', () => {
testScheduler.run(({ hot, time, expectObservable, expectSubscriptions }) => {
const e1 = hot(' ---^---a---b-------c----#');
const t = time(' ---------|')
const expected = ' ---------(ab)---c----#';
const e1Sub = ' ^--------------------!';

const absoluteDelay = new Date(testScheduler.now() + t);
const result = e1.pipe(delay(absoluteDelay, testScheduler));
Expand Down
169 changes: 72 additions & 97 deletions src/internal/operators/delay.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import { async } from '../scheduler/async';
/** @prettier */
import { asyncScheduler } from '../scheduler/async';
import { isValidDate } from '../util/isDate';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Observable } from '../Observable';
import {
MonoTypeOperatorFunction,
SchedulerAction,
SchedulerLike,
TeardownLogic
} from '../types';
import { MonoTypeOperatorFunction, SchedulerLike } from '../types';
import { lift } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
* Delays the emission of items from the source Observable by a given timeout or
Expand Down Expand Up @@ -59,96 +55,75 @@ import { lift } from '../util/lift';
* @return {Observable} An Observable that delays the emissions of the source
* Observable by the specified timeout or Date.
*/
export function delay<T>(delay: number | Date, scheduler: SchedulerLike = async): MonoTypeOperatorFunction<T> {
const delayFor = isValidDate(delay) ? +delay - scheduler.now() : Math.abs(delay);
return (source: Observable<T>) => lift(source, new DelayOperator(delayFor, scheduler));
}

class DelayOperator<T> implements Operator<T, T> {
constructor(private delay: number, private scheduler: SchedulerLike) {}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new DelaySubscriber(subscriber, this.delay, this.scheduler));
}
}

interface DelayState<T> {
source: DelaySubscriber<T>;
destination: Subscriber<T>;
scheduler: SchedulerLike;
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class DelaySubscriber<T> extends Subscriber<T> {
private queue: Array<DelayMessage<T>> = [];
private active: boolean = false;

private static dispatch<T>(this: SchedulerAction<DelayState<T>>, state: DelayState<T>): void {
const source = state.source;
const queue = source.queue;
const scheduler = state.scheduler;
const destination = state.destination;

while (queue.length > 0 && queue[0].time - scheduler.now() <= 0) {
destination.next(queue.shift()!.value);
}

if (queue.length > 0) {
const delay = Math.max(0, queue[0].time - scheduler.now());
this.schedule(state, delay);
} else if (source.isStopped) {
source.destination.complete();
source.active = false;
} else {
this.unsubscribe();
source.active = false;
}
}
export function delay<T>(delay: number | Date, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
// TODO: Properly handle negative delays and dates in the past.
return (source: Observable<T>) =>
lift(source, function (this: Subscriber<T>, source: Observable<T>) {
const subscriber = this;
const isAbsoluteDelay = isValidDate(delay);
// If the source is complete
let isComplete = false;
// The number of active delays in progress.
let active = 0;
// For absolute time delay, we collect the values in this array and emit
// them when the delay fires.
let absoluteTimeValues: T[] | null = isAbsoluteDelay ? [] : null;

constructor(protected destination: Subscriber<T>, private delay: number, private scheduler: SchedulerLike) {
super(destination);
}
/**
* Used to check to see if we should complete the resulting
* subscription after delays finish or when the source completes.
* We don't want to complete when the source completes if we
* have delays in flight.
*/
const checkComplete = () => isComplete && !active && !absoluteTimeValues?.length && subscriber.complete();

private _schedule(scheduler: SchedulerLike): void {
this.active = true;
const { destination } = this;
// TODO: The cast below seems like an issue with typings for SchedulerLike to me.
destination.add(
scheduler.schedule<DelayState<T>>(DelaySubscriber.dispatch as any, this.delay, {
source: this,
destination,
scheduler,
} as DelayState<T>)
);
}

protected _next(value: T) {
const scheduler = this.scheduler;
const message = new DelayMessage(scheduler.now() + this.delay, value);
this.queue.push(message);
if (this.active === false) {
this._schedule(scheduler);
}
}

protected _error(err: any) {
this.queue.length = 0;
this.destination.error(err);
this.unsubscribe();
}

protected _complete() {
if (this.queue.length === 0) {
this.destination.complete();
}
this.unsubscribe();
}
}
if (isAbsoluteDelay) {
// A date was passed. We only do one delay, so let's get it
// scheduled right away.
active++;
subscriber.add(
scheduler.schedule(() => {
active--;
if (absoluteTimeValues) {
const values = absoluteTimeValues;
absoluteTimeValues = null;
for (const value of values) {
subscriber.next(value);
}
}
checkComplete();
}, +delay - scheduler.now())
);
}

class DelayMessage<T> {
constructor(public readonly time: number, public readonly value: T) {}
// Subscribe to the source
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
if (isAbsoluteDelay) {
// If we're dealing with an absolute time (via Date) delay, then before
// the delay fires, the `absoluteTimeValues` array will be present, and
// we want to add them to that. Otherwise, if it's `null`, that is because
// the delay has already fired.
absoluteTimeValues ? absoluteTimeValues.push(value) : subscriber.next(value);
} else {
active++;
subscriber.add(
scheduler.schedule(() => {
active--;
subscriber.next(value);
checkComplete();
}, delay as number)
);
}
},
undefined,
() => {
isComplete = true;
checkComplete();
}
)
);
});
}

0 comments on commit 8ae89b1

Please sign in to comment.