Skip to content

Commit

Permalink
fix(buffer): Remaining buffer will be emited on source close if `clos…
Browse files Browse the repository at this point in the history
…ingNotifier` active

Gives the author control over the emission of the final buffer. If the `closingNotifier` completes before the source does, no more buffers will be emitted. If the `closingNotifier` is still active when the source completes, then whatever is in the buffer at the time will be emitted from the resulting observable before it completes.

BREAKING CHANGE: Final buffered values will now be emitted from the resulting observable if the `closingNotifier` is still active. The simplest workaround if you want the original behavior (where you possibly miss values), is to add a `skipLast(1)` at the end. Otherwise, you can try to complete the `closingNotifier` prior to the completion of the source.

Fixes ReactiveX#3990, ReactiveX#6035
  • Loading branch information
benlesh committed Feb 22, 2021
1 parent 745b92b commit e835f92
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
35 changes: 25 additions & 10 deletions spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,25 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(' -a-b-c-d-e-f-g-h-i-|');
const b = hot(' -----B-----B-----B-|');
const expected = '-----x-----y-----z-|';
const expected = '-----x-----y-----z-(F|)';
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
z: ['g', 'h', 'i'],
F: [],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
});

it('should not emit a final buffer if the closingNotifier is already complete', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot(' -a-b-c-d-e-f-g-h-i-|');
const b = hot(' -----B-----B--|');
const expected = '-----x-----y-------|';
const expectedValues = {
x: ['a', 'b', 'c'],
y: ['d', 'e', 'f'],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand All @@ -31,17 +45,17 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = EMPTY;
const b = EMPTY;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

it('should work with empty and non-empty selector', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = EMPTY;
const b = hot('-----a-----');
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

Expand Down Expand Up @@ -76,8 +90,8 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ expectObservable }) => {
const a = EMPTY;
const b = NEVER;
const expected = '|';
expectObservable(a.pipe(buffer(b))).toBe(expected);
const expected = '(F|)';
expectObservable(a.pipe(buffer(b))).toBe(expected, { F: [] });
});
});

Expand Down Expand Up @@ -122,14 +136,15 @@ describe('Observable.prototype.buffer', () => {
testScheduler.run(({ hot, expectObservable }) => {
const a = hot('--1--2--^--3--4--5---6----7--8--9---0---|');
const b = hot('--------^--a-------b---cd---------e---f---|');
const expected = ' ---a-------b---cd---------e---f-|';
const expected = ' ---a-------b---cd---------e---f-(F|)';
const expectedValues = {
a: ['3'],
b: ['4', '5'],
c: ['6'],
d: [] as string[],
e: ['7', '8', '9'],
f: ['0'],
F: [],
};
expectObservable(a.pipe(buffer(b))).toBe(expected, expectedValues);
});
Expand Down Expand Up @@ -273,7 +288,7 @@ describe('Observable.prototype.buffer', () => {
const results: any[] = [];
const subject = new Subject<number>();

const source = subject.pipe(buffer(subject)).subscribe({
subject.pipe(buffer(subject)).subscribe({
next: (value) => results.push(value),
complete: () => results.push('complete'),
});
Expand All @@ -283,6 +298,6 @@ describe('Observable.prototype.buffer', () => {
subject.next(2);
expect(results).to.deep.equal([[1], [2]]);
subject.complete();
expect(results).to.deep.equal([[1], [2], 'complete']);
expect(results).to.deep.equal([[1], [2], [], 'complete']);
});
});
44 changes: 37 additions & 7 deletions src/internal/operators/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Observable } from '../Observable';
import { OperatorFunction } from '../types';
import { operate } from '../util/lift';
import { noop } from '../util/noop';
import { OperatorSubscriber } from './OperatorSubscriber';

/**
Expand Down Expand Up @@ -44,31 +43,62 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*/
export function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]> {
return operate((source, subscriber) => {
let currentBuffer: T[] = [];
// The current buffered values. If this is null, it's because the
// closingNotifier has completed before the source.
let currentBuffer: T[] | null = [];

// Subscribe to our source.
source.subscribe(new OperatorSubscriber(subscriber, (value) => currentBuffer.push(value)));
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => {
// If we don't have a currentBuffer, it's because the closingNotifier
// has completed, signifying that the author no longer wants any
// more buffers emitted.
if (currentBuffer) {
currentBuffer.push(value);
}
},
// Pass all errors to the consumer.
undefined,
() => {
// If we don't have a currentBuffer, it's because the
// closingNotifier has already completed, telling us the
// author did not want to emit any more buffers because they
// were never going to close another one.
if (currentBuffer) {
subscriber.next(currentBuffer);
}
subscriber.complete();
}
)
);

// Subscribe to the closing notifier.
closingNotifier.subscribe(
new OperatorSubscriber(
subscriber,
() => {
// Start a new buffer and emit the previous one.
const b = currentBuffer;
const b = currentBuffer!;
currentBuffer = [];
subscriber.next(b);
},
// Pass all errors to the consumer.
undefined,
// Closing notifier should not complete the resulting observable.
noop
() => {
// Null out the current buffer to signal to the rest
// of the code that the closingNotifier has completed.
// This is a cue that the author no longer wants to
// emit any more buffers.
currentBuffer = null;
}
)
);

return () => {
// Ensure buffered values are released on teardown.
currentBuffer = null!;
currentBuffer = null;
};
});
}

0 comments on commit e835f92

Please sign in to comment.