Skip to content

Commit

Permalink
fix(buffer): Remaining buffer will correctly be emited on source close.
Browse files Browse the repository at this point in the history
BREAKING CHANGE: Final buffered values will now always be emitted. To get the same behavior as the previous release, you can use `endWith` and `skipLast(1)`, like so: `source$.pipe(buffer(notifier$.pipe(endWith(true))), skipLast(1))`

Fixes #3990, #6035
  • Loading branch information
benlesh committed Feb 24, 2021
1 parent 296273f commit 0c667d5
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
48 changes: 33 additions & 15 deletions spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,26 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(' -a-b-c-d-e-f-g-h-i-|');
const b = hot(' -----B-----B-----B-|');
const expected = '-----x-----y-----z-|';
const expected = '-----x-----y-----z-(F|)';
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
z: ['g', 'h', 'i'],
F: [],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
});

it('should not emit a final buffer if the closingNotifier is already complete', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(' -a-b-c-d-e-f-g-h-i-|');
const b = hot(' -----B-----B--|');
const expected = '-----x-----y-------(F|)';
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
F: ['g', 'h', 'i'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand Down Expand Up @@ -52,26 +67,28 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = EMPTY;
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

it('should work with empty and non-empty selector', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = EMPTY;
const b = hot('-----a-----');
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

it('should work with non-empty and empty selector', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = EMPTY;
const expected = ' --------------------------------|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = ' --------------------------------(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, {
F: ['3', '4', '5', '6', '7', '8', '9', '0'],
});
});
});

Expand All @@ -97,8 +114,8 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = EMPTY;
const b = NEVER;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

Expand Down Expand Up @@ -143,31 +160,32 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = hot('--------^--a-------b---cd---------e---f---|');
const expected = ' ---a-------b---cd---------e---f-|';
const expected = ' ---a-------b---cd---------e---f-(F|)';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[],
e: ['7', '8', '9'],
f: ['0'],
F: [],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
});

it(' work with selector completed', () => {
// Buffshoulder Boundaries onCompletedBoundaries (RxJS 4)
it('should work with selector completed', () => {
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const subs = ' ^-------------------------------!';
const b = hot('--------^--a-------b---cd| ');
const expected = ' ---a-------b---cd---------------|';
const expected = ' ---a-------b---cd---------------(F|)';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[],
F: ['7', '8', '9', '0'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
expectSubscriptions(a.subscriptions).toBe(subs);
Expand Down Expand Up @@ -294,7 +312,7 @@ describe('Observable.prototype.buffer', () => {
const results: any[] = [];
const subject = new Subject<number>();

const source = subject.pipe(buffer(subject)).subscribe({
subject.pipe(buffer(subject)).subscribe({
next: (value) => results.push(value),
complete: () => results.push('complete'),
});
Expand All @@ -304,7 +322,7 @@ describe('Observable.prototype.buffer', () => {
subject.next(2);
expect(results).to.deep.equal([[1], [2]]);
subject.complete();
expect(results).to.deep.equal([[1], [2], 'complete']);
expect(results).to.deep.equal([[1], [2], [], 'complete']);
});

describe('equivalence with the window operator', () => {
Expand Down
16 changes: 14 additions & 2 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,23 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*/
export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
return operate((source, subscriber) => {
// The current buffered values. If this is null, it's because the
// closingNotifier has completed before the source.
let currentBuffer: T[] = [];

// Subscribe to our source.
source.subscribe(new OperatorSubscriber(subscriber, (value) => currentBuffer.push(value)));
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => currentBuffer.push(value),
// Pass all errors to the consumer.
undefined,
() => {
subscriber.next(currentBuffer);
subscriber.complete();
}
)
);

// Subscribe to the closing notifier.
closingNotifier.subscribe(
Expand All @@ -61,7 +74,6 @@ export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T,
},
// Pass all errors to the consumer.
undefined,
// Closing notifier should not complete the resulting observable.
noop
)
);
Expand Down

0 comments on commit 0c667d5

Please sign in to comment.