Skip to content

Commit

Permalink
fix(delayWhen): Emit source value if duration selector completes sync…
Browse files Browse the repository at this point in the history
…hronously (#3664)

* fix(delayWhen): Emit source value if duration selector completes synchronously

This fixes an issue where delayWhen would not re-emit a source emission if the duration selector
completed synchronously.

fixes #3663

* docs(delayWhen): Deprecate completion of notifier triggering source emission

This deprecates the behavior that the completion of the notifier observable will cause the source
emission to be emitted on the output observable.
  • Loading branch information
Airblader authored and benlesh committed May 31, 2018
1 parent d7bfc9d commit 2c43af7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
24 changes: 23 additions & 1 deletion spec/operators/delayWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { of } from 'rxjs';
import { of, EMPTY } from 'rxjs';
import { delayWhen } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
Expand Down Expand Up @@ -106,6 +106,28 @@ describe('delayWhen operator', () => {
expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
expectSubscriptions(selector.subscriptions).toBe(selectorSubs);
});

it('should emit if the selector completes synchronously', () => {
const e1 = hot('a--|');
const expected = 'a--|';
const subs = '^ !';

const result = e1.pipe(delayWhen((x: any) => EMPTY));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should emit if the source completes synchronously and the selector completes synchronously', () => {
const e1 = hot('(a|)');
const expected = '(a|)';
const subs = '(^!)';

const result = e1.pipe(delayWhen((x: any) => EMPTY));

expectObservable(result).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should not emit if selector never emits', () => {
Expand Down
17 changes: 9 additions & 8 deletions src/internal/operators/delayWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import { InnerSubscriber } from '../InnerSubscriber';
import { subscribeToResult } from '../util/subscribeToResult';
import { MonoTypeOperatorFunction, TeardownLogic } from '../types';

/* tslint:disable:max-line-length */
/** @deprecated In future versions, empty notifiers will no longer re-emit the source value on the output observable. */
export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<never>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>;
export function delayWhen<T>(delayDurationSelector: (value: T) => Observable<any>, subscriptionDelay?: Observable<any>): MonoTypeOperatorFunction<T>;
/* tslint:disable:max-line-length */

/**
* Delays the emission of items from the source Observable by a given time span
* determined by the emissions of another Observable.
Expand All @@ -22,6 +28,8 @@ import { MonoTypeOperatorFunction, TeardownLogic } from '../types';
* argument, and should return an Observable, called the "duration" Observable.
* The source value is emitted on the output Observable only when the duration
* Observable emits a value or completes.
* The completion of the notifier triggering the emission of the source value
* is deprecated behavior and will be removed in future versions.
*
* Optionally, `delayWhen` takes a second argument, `subscriptionDelay`, which
* is an Observable. When `subscriptionDelay` emits its first value or
Expand Down Expand Up @@ -79,7 +87,6 @@ class DelayWhenOperator<T> implements Operator<T, T> {
class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
private completed: boolean = false;
private delayNotifierSubscriptions: Array<Subscription> = [];
private values: Array<T> = [];

constructor(destination: Subscriber<T>,
private delayDurationSelector: (value: T) => Observable<any>) {
Expand Down Expand Up @@ -126,15 +133,11 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
subscription.unsubscribe();

const subscriptionIdx = this.delayNotifierSubscriptions.indexOf(subscription);
let value: T = null;

if (subscriptionIdx !== -1) {
value = this.values[subscriptionIdx];
this.delayNotifierSubscriptions.splice(subscriptionIdx, 1);
this.values.splice(subscriptionIdx, 1);
}

return value;
return subscription.outerValue;
}

private tryDelay(delayNotifier: Observable<any>, value: T): void {
Expand All @@ -144,8 +147,6 @@ class DelayWhenSubscriber<T, R> extends OuterSubscriber<T, R> {
this.add(notifierSubscription);
this.delayNotifierSubscriptions.push(notifierSubscription);
}

this.values.push(value);
}

private tryComplete(): void {
Expand Down

0 comments on commit 2c43af7

Please sign in to comment.