diff --git a/spec/operators/buffer-spec.ts b/spec/operators/buffer-spec.ts index e8c75e1131..376748b3e6 100644 --- a/spec/operators/buffer-spec.ts +++ b/spec/operators/buffer-spec.ts @@ -225,6 +225,19 @@ describe('Observable.prototype.buffer', () => { expectSubscriptions(b.subscriptions).toBe(bsubs); }); + it('should work with filtered source as closingNotifier', () => { + const values = {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}; + + const source = hot('-0-1-2-3-4-5-6-7-8-|', values); + const expected = '-a---b---c---d---e-|'; + + const expectedValues = {a: [0], b: [1, 2], c: [3, 4], d: [5, 6], e: [7, 8]}; + const filteredSource = source.filter(x => x % 2 === 0); + + const result = source.buffer(filteredSource); + expectObservable(result).toBe(expected, expectedValues); + }); + it('should emit last buffer if source completes', () => { const a = hot('-a-b-c-d-e-f-g-h-i-|'); const b = hot('-----B-----B--------'); diff --git a/src/operator/buffer.ts b/src/operator/buffer.ts index 73807fa5a5..e690836c53 100644 --- a/src/operator/buffer.ts +++ b/src/operator/buffer.ts @@ -1,5 +1,6 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; import { Observable } from '../Observable'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -47,8 +48,11 @@ class BufferOperator implements Operator { constructor(private closingNotifier: Observable) { } - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier)); + call(subscriber: Subscriber, source: any): TeardownLogic { + const bufferSubscriber = new BufferSubscriber(subscriber); + const subscription = source.subscribe(bufferSubscriber); + bufferSubscriber.subscribeToClosingNotifier(this.closingNotifier); + return subscription; } } @@ -60,8 +64,11 @@ class BufferOperator implements Operator { class BufferSubscriber extends OuterSubscriber { private buffer: T[] = []; - constructor(destination: Subscriber, closingNotifier: Observable) { + constructor(destination: Subscriber) { super(destination); + } + + subscribeToClosingNotifier(closingNotifier: Observable) { this.add(subscribeToResult(this, closingNotifier)); }