diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index 1f933d8360..53b7c19ef6 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -1,4 +1,5 @@ -import { buffer, mergeMap, take } from 'rxjs/operators'; +/** @prettier */ +import { buffer, mergeMap, take, window, toArray } from 'rxjs/operators'; import { EMPTY, NEVER, throwError, of, Subject } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; @@ -16,22 +17,58 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot(' -a-b-c-d-e-f-g-h-i-|'); const b = hot(' -----B-----B-----B-|'); - const expected = '-----x-----y-----z-|'; + const expected = '-----x-----y-----z-(F|)'; const expectedValues = { x: ['a', 'b', 'c'], y: ['d', 'e', 'f'], - z: ['g', 'h', 'i'] + z: ['g', 'h', 'i'], + F: [], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); }); + it('should emit a final buffer if the closingNotifier is already complete', () => { + testScheduler.run(({ hot, expectObservable }) => { + const a = hot(' -a-b-c-d-e-f-g-h-i-|'); + const b = hot(' -----B-----B--|'); + const expected = '-----x-----y-------(F|)'; + const expectedValues = { + x: ['a', 'b', 'c'], + y: ['d', 'e', 'f'], + F: ['g', 'h', 'i'], + }; + expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); + }); + }); + + it('should emit all buffered values if the source completes before the closingNotifier does', () => { + testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { + const source = hot('---^---a---b---c---d---e--f----|'); + const sourceSubs = ' ^---------------------------!'; + const closer = hot('---^-------------B----------------'); + const closerSubs = ' ^---------------------------!'; + const expected = ' --------------x-------------(F|)'; + + const result = source.pipe(buffer(closer)); + + const expectedValues = { + x: ['a', 'b', 'c'], + F: ['d', 'e', 'f'], + }; + + expectObservable(result).toBe(expected, expectedValues); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + expectSubscriptions(closer.subscriptions).toBe(closerSubs); + }); + }); + it('should work with empty and empty selector', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; const b = EMPTY; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -39,8 +76,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = EMPTY; const b = hot('-----a-----'); - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -48,8 +85,10 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = EMPTY; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = ' --------------------------------(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { + F: ['3', '4', '5', '6', '7', '8', '9', '0'], + }); }); }); @@ -66,7 +105,7 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = NEVER; const b = EMPTY; - const expected = '|'; + const expected = '-'; expectObservable(a.pipe(buffer(b))).toBe(expected); }); }); @@ -75,8 +114,8 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ expectObservable }) => { const a = EMPTY; const b = NEVER; - const expected = '|'; - expectObservable(a.pipe(buffer(b))).toBe(expected); + const expected = '(F|)'; + expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] }); }); }); @@ -121,31 +160,32 @@ describe('Observable.prototype.buffer', () => { testScheduler.run(({ hot, expectObservable }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); const b = hot('--------^--a-------b---cd---------e---f---|'); - const expected = ' ---a-------b---cd---------e---f-|'; + const expected = ' ---a-------b---cd---------e---f-(F|)'; const expectedValues = { a: ['3'], b: ['4', '5'], c: ['6'], d: [] as string[], e: ['7', '8', '9'], - f: ['0'] + f: ['0'], + F: [], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); }); }); - it(' work with selector completed', () => { - // Buffshoulder Boundaries onCompletedBoundaries (RxJS 4) + it('should work with selector completed', () => { testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); - const subs = ' ^----------------! '; + const subs = ' ^-------------------------------!'; const b = hot('--------^--a-------b---cd| '); - const expected = ' ---a-------b---cd| '; + const expected = ' ---a-------b---cd---------------(F|)'; const expectedValues = { a: ['3'], b: ['4', '5'], c: ['6'], - d: [] as string[] + d: [] as string[], + F: ['7', '8', '9', '0'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -161,7 +201,7 @@ describe('Observable.prototype.buffer', () => { const expected = ' ---a-------b--- '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues); expectSubscriptions(a.subscriptions).toBe(subs); @@ -177,13 +217,13 @@ describe('Observable.prototype.buffer', () => { const unsub = ' --------------! '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; const result = a.pipe( mergeMap((x: any) => of(x)), buffer(b), - mergeMap((x: any) => of(x)), + mergeMap((x: any) => of(x)) ); expectObservable(result, unsub).toBe(expected, expectedValues); @@ -194,13 +234,13 @@ describe('Observable.prototype.buffer', () => { it('should work with non-empty and selector error', () => { // Buffer Boundaries onErrorSource (RxJS 4) testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const a = hot('--1--2--^--3-----#', {'3': 3}, new Error('too bad')); + const a = hot('--1--2--^--3-----#', { '3': 3 }, new Error('too bad')); const subs = ' ^--------!'; const b = hot('--------^--a--b---'); const expected = ' ---a--b--#'; const expectedValues = { a: [3], - b: [] as string[] + b: [] as string[], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); @@ -227,7 +267,7 @@ describe('Observable.prototype.buffer', () => { const expectedValues = { a: ['3'], b: ['4', '5'], - c: ['6'] + c: ['6'], }; expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); @@ -244,7 +284,7 @@ describe('Observable.prototype.buffer', () => { const expected = ' ---a-------b--- '; const expectedValues = { a: ['3'], - b: ['4', '5'] + b: ['4', '5'], }; expectObservable(a.pipe(buffer(b)), unsub).toBe(expected, expectedValues); @@ -272,11 +312,9 @@ describe('Observable.prototype.buffer', () => { const results: any[] = []; const subject = new Subject(); - const source = subject.pipe( - buffer(subject) - ).subscribe({ - next: value => results.push(value), - complete: () => results.push('complete') + subject.pipe(buffer(subject)).subscribe({ + next: (value) => results.push(value), + complete: () => results.push('complete'), }); subject.next(1); @@ -284,6 +322,41 @@ describe('Observable.prototype.buffer', () => { subject.next(2); expect(results).to.deep.equal([[1], [2]]); subject.complete(); - expect(results).to.deep.equal([[1], [2], 'complete']); + expect(results).to.deep.equal([[1], [2], [], 'complete']); + }); + + describe('equivalence with the window operator', () => { + const cases = [ + { + source: ' -a-b-c-d-e-f-g-h-i-|', + notifier: ' -----B-----B-----B-|', + }, + { + source: ' -a-b-c-d-e-f-g-h-i-|', + notifier: ' -----B-----B--| ', + }, + { + source: ' -a-b-c-d-e---------|', + notifier: ' -----B-----B-----B-|', + }, + { + source: ' -a-b-c-d-e-f-g-h-i-|', + notifier: ' -------------------|', + }, + ]; + cases.forEach(({ source, notifier }, index) => { + it(`should be equivalent for case ${index}`, () => { + testScheduler.run(({ hot, expectObservable }) => { + const a = hot(source); + const b = hot(notifier); + expectObservable(a.pipe(buffer(b))).toEqual( + a.pipe( + window(b), + mergeMap((w) => w.pipe(toArray())) + ) + ); + }); + }); + }); }); }); diff --git a/spec/operators/window-spec.ts b/spec/operators/window-spec.ts index adeca0240b..923a8cb6aa 100644 --- a/spec/operators/window-spec.ts +++ b/spec/operators/window-spec.ts @@ -106,13 +106,13 @@ describe('window operator', () => { it('should be able to split a never Observable into timely empty windows', () => { const source = hot('^--------'); - const sourceSubs = '^ !'; + const sourceSubs = '^ '; const closings = cold('--x--x--|'); const closingSubs = '^ !'; - const expected = 'a-b--c--|'; + const expected = 'a-b--c---'; const a = cold('--| '); const b = cold( '---| '); - const c = cold( '---|'); + const c = cold( '----'); const expectedValues = { a: a, b: b, c: c }; const result = source.pipe(window(closings)); @@ -234,13 +234,13 @@ describe('window operator', () => { it('should complete the resulting Observable when window closings completes', () => { const source = hot('-1-2-^3-4-5-6-7-8-9-|'); - const subs = '^ ! '; + const subs = '^ !'; const closings = hot('---^---x---x---| '); const closingSubs = '^ ! '; - const expected = 'a---b---c---| '; + const expected = 'a---b---c------|'; const a = cold( '-3-4| '); const b = cold( '-5-6| '); - const c = cold( '-7-8| '); + const c = cold( '-7-8-9-|'); const expectedValues = { a: a, b: b, c: c }; const result = source.pipe(window(closings)); diff --git a/src/internal/operators/buffer.ts b/src/internal/operators/buffer.ts index 56e4015ce0..6f34c6f0d4 100644 --- a/src/internal/operators/buffer.ts +++ b/src/internal/operators/buffer.ts @@ -1,6 +1,7 @@ import { Observable } from '../Observable'; import { OperatorFunction } from '../types'; import { operate } from '../util/lift'; +import { noop } from '../util/noop'; import { OperatorSubscriber } from './OperatorSubscriber'; /** @@ -43,19 +44,37 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function buffer(closingNotifier: Observable): OperatorFunction { return operate((source, subscriber) => { + // The current buffered values. let currentBuffer: T[] = []; // Subscribe to our source. - source.subscribe(new OperatorSubscriber(subscriber, (value) => currentBuffer.push(value))); + source.subscribe( + new OperatorSubscriber( + subscriber, + (value) => currentBuffer.push(value), + // Pass all errors to the consumer. + undefined, + () => { + subscriber.next(currentBuffer); + subscriber.complete(); + } + ) + ); // Subscribe to the closing notifier. closingNotifier.subscribe( - new OperatorSubscriber(subscriber, () => { - // Start a new buffer and emit the previous one. - const b = currentBuffer; - currentBuffer = []; - subscriber.next(b); - }) + new OperatorSubscriber( + subscriber, + () => { + // Start a new buffer and emit the previous one. + const b = currentBuffer; + currentBuffer = []; + subscriber.next(b); + }, + // Pass all errors to the consumer. + undefined, + noop + ) ); return () => { diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index da7d28b14d..8671a2a7cb 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -3,6 +3,7 @@ import { OperatorFunction } from '../types'; import { Subject } from '../Subject'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; +import { noop } from '../util/noop'; /** * Branch out the source Observable values as a nested Observable whenever @@ -47,45 +48,46 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function window(windowBoundaries: Observable): OperatorFunction> { return operate((source, subscriber) => { - let windowSubject = new Subject(); + let windowSubject: Subject = new Subject(); subscriber.next(windowSubject.asObservable()); - /** - * Subscribes to one of our two observables in this operator in the same way, - * only allowing for different behaviors with the next handler. - * @param sourceOrNotifier The observable to subscribe to. - * @param next The next handler to use with the subscription - */ - const windowSubscribe = (sourceOrNotifier: Observable, next: (value: any) => void) => - sourceOrNotifier.subscribe( - new OperatorSubscriber( - subscriber, - next, - (err: any) => { - windowSubject.error(err); - subscriber.error(err); - }, - () => { - windowSubject.complete(); - subscriber.complete(); - } - ) - ); + const errorHandler = (err: any) => { + windowSubject.error(err); + subscriber.error(err); + }; // Subscribe to our source - windowSubscribe(source, (value) => windowSubject.next(value)); + source.subscribe( + new OperatorSubscriber( + subscriber, + (value) => windowSubject?.next(value), + errorHandler, + () => { + windowSubject.complete(); + subscriber.complete(); + } + ) + ); + // Subscribe to the window boundaries. - windowSubscribe(windowBoundaries, () => { - windowSubject.complete(); - subscriber.next((windowSubject = new Subject())); - }); + windowBoundaries.subscribe( + new OperatorSubscriber( + subscriber, + () => { + windowSubject.complete(); + subscriber.next((windowSubject = new Subject())); + }, + errorHandler, + noop + ) + ); - // Additional teardown. Note that other teardown and post-subscription logic - // is encapsulated in the act of a Subscriber subscribing to the observable - // during the subscribe call. We can return additional teardown here. return () => { - windowSubject.unsubscribe(); + // Unsubscribing the subject ensures that anyone who has captured + // a reference to this window that tries to use it after it can + // no longer get values from the source will get an ObjectUnsubscribedError. + windowSubject?.unsubscribe(); windowSubject = null!; }; });