From 4c30f7d802b6b178eda31f134a3e3e32bc41fd10 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Tue, 23 Feb 2021 21:24:03 +1000 Subject: [PATCH 1/6] test: add buffer/window equivalence tests --- spec/operators/buffer-spec.ts | 66 ++++++++++++++++++++++++++--------- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 1f933d8360..80ac9a6a0f 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -1,4 +1,5 @@ -import { buffer, mergeMap, take } from 'rxjs/operators'; +/** @prettier */ +import { buffer, mergeMap, take, window, toArray } from 'rxjs/operators'; import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -20,7 +21,7 @@ describe('Observable.prototype.buffer', () => { const expectedValues = { x: ['a', 'b', 'c'], y: ['d', 'e', 'f'], - z: ['g', 'h', 'i'] + z: ['g', 'h', 'i'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); @@ -128,7 +129,7 @@ describe('Observable.prototype.buffer', () => { c: ['6'], d: [] as string[], e: ['7', '8', '9'], - f: ['0'] + f: ['0'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); @@ -145,7 +146,7 @@ describe('Observable.prototype.buffer', () => { a: ['3'], b: ['4', '5'], c: ['6'], - d: [] as string[] + d: [] as string[], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -161,7 +162,7 @@ describe('Observable.prototype.buffer', () => { const expected = ' ---a-------b--- '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -177,13 +178,13 @@ describe('Observable.prototype.buffer', () => { const unsub = ' --------------! '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; const result = a.pipe( mergeMap((x: any) => of(x)), buffer(b), - mergeMap((x: any) => of(x)), + mergeMap((x: any) => of(x)) ); expectObservable(result, unsub).toBe(expected, expectedValues); @@ -194,13 +195,13 @@ describe('Observable.prototype.buffer', () => { it('should work with non-empty and selector error', () => { // Buffer Boundaries onErrorSource (RxJS 4) testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad')); + const a = hot('--1--2--^--3-----#', { '3': 3 }, new Error('too bad')); const subs = ' ^--------!'; const b = hot('--------^--a--b---'); const expected = ' ---a--b--#'; const expectedValues = { a: [3], - b: [] as string[] + b: [] as string[], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); @@ -227,7 +228,7 @@ describe('Observable.prototype.buffer', () => { const expectedValues = { a: ['3'], b: ['4', '5'], - c: ['6'] + c: ['6'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); @@ -244,7 +245,7 @@ describe('Observable.prototype.buffer', () => { const expected = ' ---a-------b--- '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues); @@ -272,11 +273,9 @@ describe('Observable.prototype.buffer', () => { const results: any[] = []; const subject = new Subject(); - const source = subject.pipe( - buffer(subject) - ).subscribe({ - next: value => results.push(value), - complete: () => results.push('complete') + const source = subject.pipe(buffer(subject)).subscribe({ + next: (value) => results.push(value), + complete: () => results.push('complete'), }); subject.next(1); @@ -286,4 +285,39 @@ describe('Observable.prototype.buffer', () => { subject.complete(); expect(results).to.deep.equal([[1], [2], 'complete']); }); + + describe('equivalence with the window operator', () => { + const cases = [ + { + source: ' -a-b-c-d-e-f-g-h-i-|', + notifier: ' -----B-----B-----B-|', + }, + { + source: ' -a-b-c-d-e-f-g-h-i-|', + notifier: ' -----B-----B--| ', + }, + { + source: ' -a-b-c-d-e---------|', + notifier: ' -----B-----B-----B-|', + }, + { + source: ' -a-b-c-d-e-f-g-h-i-|', + notifier: ' -------------------|', + }, + ]; + cases.forEach(({ source, notifier }, index) => { + it(`should be equivalent for case ${index}`, () => { + testScheduler.run(({ hot, expectObservable }) => { + const a = hot(source); + const b = hot(notifier); + expectObservable(a.pipe(buffer(b))).toEqual( + a.pipe( + window(b), + mergeMap((w) => w.pipe(toArray())) + ) + ); + }); + }); + }); + }); }); From 5a948a7d4e82ccf3bed7daea9d1910cb2706ef72 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 22 Feb 2021 14:22:00 -0600 Subject: [PATCH 2/6] fix(buffer): closingNotifier completion does not complete resulting observable Resolves an issue where the resulting observable would complete when the closingNotifier completed. Notifier completion should not complete the result, only source completion should do that. BREAKING CHANGE: closingNotifier no longer closes the result of `buffer`. If that is truly a desired behavior, then you should use `takeUntil`. Something like: `source$.pipe(buffer(notifier$), takeUntil(notifier$.pipe(ignoreElements(), endWith(true))))`, where `notifier$` is multicast, although there are many ways to compose this behavior. --- spec/operators/buffer-spec.ts | 8 ++++---- src/internal/operators/buffer.ts | 20 ++++++++++++++------ 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 80ac9a6a0f..243be1b852 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -49,7 +49,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = EMPTY; - const expected = '|'; + const expected = ' --------------------------------|'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -67,7 +67,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = NEVER; const b = EMPTY; - const expected = '|'; + const expected = '-'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -139,9 +139,9 @@ describe('Observable.prototype.buffer', () => { // Buffshoulder Boundaries onCompletedBoundaries (RxJS 4) testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); - const subs = ' ^----------------! '; + const subs = ' ^-------------------------------!'; const b = hot('--------^--a-------b---cd| '); - const expected = ' ---a-------b---cd| '; + const expected = ' ---a-------b---cd---------------|'; const expectedValues = { a: ['3'], b: ['4', '5'], diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 56e4015ce0..a9deb9ae81 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -50,12 +51,19 @@ export function buffer(closingNotifier: Observable): OperatorFunction { - // Start a new buffer and emit the previous one. - const b = currentBuffer; - currentBuffer = []; - subscriber.next(b); - }) + new OperatorSubscriber( + subscriber, + () => { + // Start a new buffer and emit the previous one. + const b = currentBuffer; + currentBuffer = []; + subscriber.next(b); + }, + // Pass all errors to the consumer. + undefined, + // Closing notifier should not complete the resulting observable. + noop + ) ); return () => { From a83a9f0b8b4c11305a13155b7f7ccc3208ae8b68 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 22 Feb 2021 21:04:39 -0600 Subject: [PATCH 3/6] test(buffer): add additional test for final buffer emit --- spec/operators/buffer-spec.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 243be1b852..56652ba3ce 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -27,6 +27,27 @@ describe('Observable.prototype.buffer', () => { }); }); + it('should emit all buffered values if the source completes before the closingNotifier does', () => { + testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot('---^---a---b---c---d---e--f----|'); + const sourceSubs = ' ^---------------------------!'; + const closer = hot('---^-------------B----------------'); + const closerSubs = ' ^---------------------------!'; + const expected = ' --------------x-------------(F|)'; + + const result = source.pipe(buffer(closer)); + + const expectedValues = { + x: ['a', 'b', 'c'], + F: ['d', 'e', 'f'], + }; + + expectObservable(result).toBe(expected, expectedValues); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectSubscriptions(closer.subscriptions).toBe(closerSubs); + }); + }); + it('should work with empty and empty selector', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; From 9ab93572d55768a04707faf5d0e6384d8802cb77 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 22 Feb 2021 14:46:54 -0600 Subject: [PATCH 4/6] fix(buffer): Remaining buffer will correctly be emited on source close. BREAKING CHANGE: Final buffered values will now always be emitted. To get the same behavior as the previous release, you can use `endWith` and `skipLast(1)`, like so: `source$.pipe(buffer(notifier$.pipe(endWith(true))), skipLast(1))` Fixes #3990, #6035 --- spec/operators/buffer-spec.ts | 48 ++++++++++++++++++++++---------- src/internal/operators/buffer.ts | 16 +++++++++-- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 56652ba3ce..f3d46cefa9 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -17,11 +17,26 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot(' -a-b-c-d-e-f-g-h-i-|'); const b = hot(' -----B-----B-----B-|'); - const expected = '-----x-----y-----z-|'; + const expected = '-----x-----y-----z-(F|)'; const expectedValues = { x: ['a', 'b', 'c'], y: ['d', 'e', 'f'], z: ['g', 'h', 'i'], + F: [], + }; + expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); + }); + }); + + it('should not emit a final buffer if the closingNotifier is already complete', () => { + testScheduler.run(({ hot, expectObservable }) => { + const a = hot(' -a-b-c-d-e-f-g-h-i-|'); + const b = hot(' -----B-----B--|'); + const expected = '-----x-----y-------(F|)'; + const expectedValues = { + x: ['a', 'b', 'c'], + y: ['d', 'e', 'f'], + F: ['g', 'h', 'i'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); @@ -52,8 +67,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; const b = EMPTY; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -61,8 +76,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = EMPTY; const b = hot('-----a-----'); - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -70,8 +85,10 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = EMPTY; - const expected = ' --------------------------------|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = ' --------------------------------(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { + F: ['3', '4', '5', '6', '7', '8', '9', '0'], + }); }); }); @@ -97,8 +114,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; const b = NEVER; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -143,7 +160,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = hot('--------^--a-------b---cd---------e---f---|'); - const expected = ' ---a-------b---cd---------e---f-|'; + const expected = ' ---a-------b---cd---------e---f-(F|)'; const expectedValues = { a: ['3'], b: ['4', '5'], @@ -151,23 +168,24 @@ describe('Observable.prototype.buffer', () => { d: [] as string[], e: ['7', '8', '9'], f: ['0'], + F: [], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); }); - it(' work with selector completed', () => { - // Buffshoulder Boundaries onCompletedBoundaries (RxJS 4) + it('should work with selector completed', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const subs = ' ^-------------------------------!'; const b = hot('--------^--a-------b---cd| '); - const expected = ' ---a-------b---cd---------------|'; + const expected = ' ---a-------b---cd---------------(F|)'; const expectedValues = { a: ['3'], b: ['4', '5'], c: ['6'], d: [] as string[], + F: ['7', '8', '9', '0'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -294,7 +312,7 @@ describe('Observable.prototype.buffer', () => { const results: any[] = []; const subject = new Subject(); - const source = subject.pipe(buffer(subject)).subscribe({ + subject.pipe(buffer(subject)).subscribe({ next: (value) => results.push(value), complete: () => results.push('complete'), }); @@ -304,7 +322,7 @@ describe('Observable.prototype.buffer', () => { subject.next(2); expect(results).to.deep.equal([[1], [2]]); subject.complete(); - expect(results).to.deep.equal([[1], [2], 'complete']); + expect(results).to.deep.equal([[1], [2], [], 'complete']); }); describe('equivalence with the window operator', () => { diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index a9deb9ae81..daf5b62766 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -44,10 +44,23 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function buffer(closingNotifier: Observable): OperatorFunction { return operate((source, subscriber) => { + // The current buffered values. If this is null, it's because the + // closingNotifier has completed before the source. let currentBuffer: T[] = []; // Subscribe to our source. - source.subscribe(new OperatorSubscriber(subscriber, (value) => currentBuffer.push(value))); + source.subscribe( + new OperatorSubscriber( + subscriber, + (value) => currentBuffer.push(value), + // Pass all errors to the consumer. + undefined, + () => { + subscriber.next(currentBuffer); + subscriber.complete(); + } + ) + ); // Subscribe to the closing notifier. closingNotifier.subscribe( @@ -61,7 +74,6 @@ export function buffer(closingNotifier: Observable): OperatorFunction Date: Tue, 23 Feb 2021 11:32:42 -0600 Subject: [PATCH 5/6] fix(window): final window stays open until source complete Resolves an issue where the windowBoundary complete would complete the resulting observable. BREAKING CHANGE: The windowBoundaries observable no longer completes the result. It was only ever meant to notify of the window boundary. To get the same behavior as the old behavior, you would need to add an `endWith` and a `skipLast(1)` like so: `source$.pipe(window(notifier$.pipe(endWith(true))), skipLast(1))`. --- spec/operators/window-spec.ts | 12 +++--- src/internal/operators/window.ts | 64 ++++++++++++++++---------------- 2 files changed, 39 insertions(+), 37 deletions(-) diff --git a/spec/operators/window-spec.ts b/spec/operators/window-spec.ts index adeca0240b..923a8cb6aa 100644 --- a/spec/operators/window-spec.ts +++ b/spec/operators/window-spec.ts @@ -106,13 +106,13 @@ describe('window operator', () => { it('should be able to split a never Observable into timely empty windows', () => { const source = hot('^--------'); - const sourceSubs = '^ !'; + const sourceSubs = '^ '; const closings = cold('--x--x--|'); const closingSubs = '^ !'; - const expected = 'a-b--c--|'; + const expected = 'a-b--c---'; const a = cold('--| '); const b = cold( '---| '); - const c = cold( '---|'); + const c = cold( '----'); const expectedValues = { a: a, b: b, c: c }; const result = source.pipe(window(closings)); @@ -234,13 +234,13 @@ describe('window operator', () => { it('should complete the resulting Observable when window closings completes', () => { const source = hot('-1-2-^3-4-5-6-7-8-9-|'); - const subs = '^ ! '; + const subs = '^ !'; const closings = hot('---^---x---x---| '); const closingSubs = '^ ! '; - const expected = 'a---b---c---| '; + const expected = 'a---b---c------|'; const a = cold( '-3-4| '); const b = cold( '-5-6| '); - const c = cold( '-7-8| '); + const c = cold( '-7-8-9-|'); const expectedValues = { a: a, b: b, c: c }; const result = source.pipe(window(closings)); diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index da7d28b14d..8671a2a7cb 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -3,6 +3,7 @@ import { OperatorFunction } from '../types'; import { Subject } from '../Subject'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; +import { noop } from '../util/noop'; /** * Branch out the source Observable values as a nested Observable whenever @@ -47,45 +48,46 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function window(windowBoundaries: Observable): OperatorFunction> { return operate((source, subscriber) => { - let windowSubject = new Subject(); + let windowSubject: Subject = new Subject(); subscriber.next(windowSubject.asObservable()); - /** - * Subscribes to one of our two observables in this operator in the same way, - * only allowing for different behaviors with the next handler. - * @param sourceOrNotifier The observable to subscribe to. - * @param next The next handler to use with the subscription - */ - const windowSubscribe = (sourceOrNotifier: Observable, next: (value: any) => void) => - sourceOrNotifier.subscribe( - new OperatorSubscriber( - subscriber, - next, - (err: any) => { - windowSubject.error(err); - subscriber.error(err); - }, - () => { - windowSubject.complete(); - subscriber.complete(); - } - ) - ); + const errorHandler = (err: any) => { + windowSubject.error(err); + subscriber.error(err); + }; // Subscribe to our source - windowSubscribe(source, (value) => windowSubject.next(value)); + source.subscribe( + new OperatorSubscriber( + subscriber, + (value) => windowSubject?.next(value), + errorHandler, + () => { + windowSubject.complete(); + subscriber.complete(); + } + ) + ); + // Subscribe to the window boundaries. - windowSubscribe(windowBoundaries, () => { - windowSubject.complete(); - subscriber.next((windowSubject = new Subject())); - }); + windowBoundaries.subscribe( + new OperatorSubscriber( + subscriber, + () => { + windowSubject.complete(); + subscriber.next((windowSubject = new Subject())); + }, + errorHandler, + noop + ) + ); - // Additional teardown. Note that other teardown and post-subscription logic - // is encapsulated in the act of a Subscriber subscribing to the observable - // during the subscribe call. We can return additional teardown here. return () => { - windowSubject.unsubscribe(); + // Unsubscribing the subject ensures that anyone who has captured + // a reference to this window that tries to use it after it can + // no longer get values from the source will get an ObjectUnsubscribedError. + windowSubject?.unsubscribe(); windowSubject = null!; }; }); From b0109b00f61088594e68244e9a97fbde5ee4e61f Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 24 Feb 2021 11:50:40 -0600 Subject: [PATCH 6/6] chore: fix test description and comment --- spec/operators/buffer-spec.ts | 2 +- src/internal/operators/buffer.ts | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index f3d46cefa9..53b7c19ef6 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -28,7 +28,7 @@ describe('Observable.prototype.buffer', () => { }); }); - it('should not emit a final buffer if the closingNotifier is already complete', () => { + it('should emit a final buffer if the closingNotifier is already complete', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot(' -a-b-c-d-e-f-g-h-i-|'); const b = hot(' -----B-----B--|'); diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index daf5b62766..6f34c6f0d4 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -44,8 +44,7 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function buffer(closingNotifier: Observable): OperatorFunction { return operate((source, subscriber) => { - // The current buffered values. If this is null, it's because the - // closingNotifier has completed before the source. + // The current buffered values. let currentBuffer: T[] = []; // Subscribe to our source.