Skip to content

Commit

Permalink
fix(buffer): subscribe to source and closingNotifier in proper order (#…
Browse files Browse the repository at this point in the history
…2195)

In buffer operator subscribe to source observable first, so that when
closingNotifier emits value, all source values emited before land in buffer

Closes #1610
BREAKING CHANGE:
When source and closingNotifier fire at the same time, it is expected
that value emitted by source will first land in buffer and then
closingNotifier will close it. Because of reversed subscription order,
closingNotifier emitted first, so source was not able to put value in
buffer before it was closed. Now source is subscribed before closingNotifier,
so if they fire at the same time, source value is put into buffer and then
closingNotifer closes it.
  • Loading branch information
mpodlasin authored and benlesh committed Jun 14, 2017
1 parent c04eb85 commit 41e33f5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
13 changes: 13 additions & 0 deletions spec/operators/buffer-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,19 @@ describe('Observable.prototype.buffer', () => {
expectSubscriptions(b.subscriptions).toBe(bsubs);
});

it('should work with filtered source as closingNotifier', () => {
const values = {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8};

const source = hot('-0-1-2-3-4-5-6-7-8-|', values);
const expected = '-a---b---c---d---e-|';

const expectedValues = {a: [0], b: [1, 2], c: [3, 4], d: [5, 6], e: [7, 8]};
const filteredSource = source.filter(x => x % 2 === 0);

const result = source.buffer(filteredSource);
expectObservable(result).toBe(expected, expectedValues);
});

it('should emit last buffer if source completes', () => {
const a = hot('-a-b-c-d-e-f-g-h-i-|');
const b = hot('-----B-----B--------');
Expand Down
13 changes: 10 additions & 3 deletions src/operator/buffer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { TeardownLogic } from '../Subscription';
import { Observable } from '../Observable';

import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -47,8 +48,11 @@ class BufferOperator<T> implements Operator<T, T[]> {
constructor(private closingNotifier: Observable<any>) {
}

call(subscriber: Subscriber<T[]>, source: any): any {
return source.subscribe(new BufferSubscriber(subscriber, this.closingNotifier));
call(subscriber: Subscriber<T[]>, source: any): TeardownLogic {
const bufferSubscriber = new BufferSubscriber(subscriber);
const subscription = source.subscribe(bufferSubscriber);
bufferSubscriber.subscribeToClosingNotifier(this.closingNotifier);
return subscription;
}
}

Expand All @@ -60,8 +64,11 @@ class BufferOperator<T> implements Operator<T, T[]> {
class BufferSubscriber<T> extends OuterSubscriber<T, any> {
private buffer: T[] = [];

constructor(destination: Subscriber<T[]>, closingNotifier: Observable<any>) {
constructor(destination: Subscriber<T[]>) {
super(destination);
}

subscribeToClosingNotifier(closingNotifier: Observable<any>) {
this.add(subscribeToResult(this, closingNotifier));
}

Expand Down

0 comments on commit 41e33f5

Please sign in to comment.