Skip to content

Commit

Permalink
fix(mergeAll): add source subscription to composite before actually s…
Browse files Browse the repository at this point in the history
…ubscribing

Add subscriptions for source Observables to mergeAll composite subscription
before actually subscribing to any of these Observables, so that if
source Observable emits synchronously and consumer of mergeAll unsubscribes
at that moment (for example `take` operator), subscription to source is
unsubscribed as well and Observable stops emitting.

Closes #2476
  • Loading branch information
mpodlasin committed Mar 19, 2017
1 parent 01e1343 commit 98ca133
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
30 changes: 30 additions & 0 deletions spec/operators/mergeAll-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,4 +415,34 @@ describe('Observable.prototype.mergeAll', () => {
},
() => { done(new Error('should not be called')); });
});

it('should finalize generators when merged if the subscription ends', () => {
const iterator1 = {
finalized: false,
next() {
return {value: 'duck', done: false};
},
return() {
this.finalized = true;
},
[Rx.Symbol.iterator]() {
return this;
}
};

const results = [];

const i1 = Rx.Observable.from(iterator1 as any);
Rx.Observable.merge(i1, i1)
.take(3)
.subscribe(
x => results.push(x),
null,
() => results.push('GOOSE!')
);

// never even get here
expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
expect(iterator1.finalized).to.be.true;
});
});
5 changes: 4 additions & 1 deletion src/operator/mergeAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Subscription } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { Subscribable } from '../Observable';
import { subscribeToResult } from '../util/subscribeToResult';
import { InnerSubscriber } from '../InnerSubscriber';

export function mergeAll<T>(this: Observable<T>, concurrent?: number): T;
export function mergeAll<T, R>(this: Observable<T>, concurrent?: number): Subscribable<R>;
Expand Down Expand Up @@ -83,7 +84,9 @@ export class MergeAllSubscriber<T> extends OuterSubscriber<Observable<T>, T> {
protected _next(observable: Observable<T>) {
if (this.active < this.concurrent) {
this.active++;
this.add(subscribeToResult<Observable<T>, T>(this, observable));
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
this.add(innerSubscriber);
subscribeToResult<Observable<T>, T>(this, observable, undefined, undefined, innerSubscriber);
} else {
this.buffer.push(observable);
}
Expand Down
8 changes: 5 additions & 3 deletions src/util/subscribeToResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import { $$observable } from '../symbol/observable';
export function subscribeToResult<T, R>(outerSubscriber: OuterSubscriber<T, R>,
result: any,
outerValue?: T,
outerIndex?: number): Subscription;
outerIndex?: number,
destination?: Subscriber<any>): Subscription;
export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
result: ObservableInput<T>,
outerValue?: T,
outerIndex?: number): Subscription {
let destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);
outerIndex?: number,
destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
): Subscription {

if (destination.closed) {
return null;
Expand Down

0 comments on commit 98ca133

Please sign in to comment.