Skip to content

Commit

Permalink
fix(delay): emit complete notification as soon as possible
Browse files Browse the repository at this point in the history
BC, closes #4249
  • Loading branch information
martinsik committed Dec 31, 2018
1 parent bd0b6ca commit 63b8797
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
30 changes: 20 additions & 10 deletions spec/operators/delay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ declare const rxTestScheduler: TestScheduler;
/** @test {delay} */
describe('delay operator', () => {
asDiagram('delay(20)')('should delay by specified timeframe', () => {
const e1 = hot('---a--b--| ');
const t = time( '--| ');
const expected = '-----a--b--|';
const subs = '^ ! ';
const e1 = hot('---a--b--|');
const t = time( '--| ');
const expected = '-----a--b|';
const subs = '^ !';

const result = e1.pipe(delay(t, rxTestScheduler));

Expand All @@ -25,7 +25,7 @@ describe('delay operator', () => {
it('should delay by absolute time period', () => {
const e1 = hot('--a--b--| ');
const t = time( '---| ');
const expected = '-----a--b--|';
const expected = '-----a--(b|)';
const subs = '^ ! ';

const absoluteDelay = new Date(rxTestScheduler.now() + t);
Expand All @@ -38,7 +38,7 @@ describe('delay operator', () => {
it('should delay by absolute time period after subscription', () => {
const e1 = hot('---^--a--b--| ');
const t = time( '---| ');
const expected = '------a--b--|';
const expected = '------a--(b|)';
const subs = '^ ! ';

const absoluteDelay = new Date(rxTestScheduler.now() + t);
Expand Down Expand Up @@ -86,10 +86,10 @@ describe('delay operator', () => {
expectSubscriptions(e1.subscriptions).toBe(e1Sub);
});

it('should delay when source does not emits', () => {
it('should not delay when source does not emits', () => {
const e1 = hot('----| ');
const t = time( '---|');
const expected = '-------|';
const expected = '----|';
const subs = '^ ! ';

const result = e1.pipe(delay(t, rxTestScheduler));
Expand All @@ -98,10 +98,20 @@ describe('delay operator', () => {
expectSubscriptions(e1.subscriptions).toBe(subs);
});

it('should delay when source is empty', () => {
it('should not delay when source is empty', () => {
const e1 = cold('|');
const t = time('---|');
const expected = '---|';
const expected = '|';

const result = e1.pipe(delay(t, rxTestScheduler));

expectObservable(result).toBe(expected);
});

it('should delay complete when a value is scheduled', () => {
const e1 = cold('-a-|');
const t = time('---|');
const expected = '----(a|)';

const result = e1.pipe(delay(t, rxTestScheduler));

Expand Down
2 changes: 1 addition & 1 deletion spec/operators/mergeScan-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ describe('mergeScan', () => {
it('should emit accumulator if inner completes without value after source completes', () => {
const e1 = hot('--a--^--b--c--d--e--f--g--|');
const e1subs = '^ !';
const expected = '-----------------------(x|)';
const expected = '---------------------(x|)';

const source = e1.pipe(
mergeScan((acc, x) => EMPTY.delay(50, rxTestScheduler), ['1'])
Expand Down
2 changes: 1 addition & 1 deletion spec/schedulers/TestScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ describe('TestScheduler', () => {
const output = cold('-a|').pipe(
delay(1000 * 10)
);
const expected = ' - 10s a|';
const expected = ' - 10s (a|)';
expectObservable(output).toBe(expected);
});
});
Expand Down
7 changes: 6 additions & 1 deletion src/internal/operators/delay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class DelaySubscriber<T> extends Subscriber<T> {
if (queue.length > 0) {
const delay = Math.max(0, queue[0].time - scheduler.now());
this.schedule(state, delay);
} else if (source.isStopped) {
source.destination.complete();
source.active = false;
} else {
this.unsubscribe();
source.active = false;
Expand Down Expand Up @@ -143,7 +146,9 @@ class DelaySubscriber<T> extends Subscriber<T> {
}

protected _complete() {
this.scheduleNotification(Notification.createComplete());
if (this.queue.length === 0) {
this.destination.complete();
}
this.unsubscribe();
}
}
Expand Down

0 comments on commit 63b8797

Please sign in to comment.