From 4c16aa6651060b2f5d7ff2465698a2ad48456472 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 22 Sep 2015 13:52:30 -0700 Subject: [PATCH] feat(mergeAll): now supports promises, iterables and lowercase-o observables --- src/operators/mergeAll-support.ts | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/operators/mergeAll-support.ts b/src/operators/mergeAll-support.ts index 71d2111f4e..bd3c728c59 100644 --- a/src/operators/mergeAll-support.ts +++ b/src/operators/mergeAll-support.ts @@ -3,6 +3,8 @@ import Operator from '../Operator'; import Subscriber from '../Subscriber'; import Observer from '../Observer'; import Subscription from '../Subscription'; +import OuterSubscriber from '../OuterSubscriber'; +import subscribeToResult from '../util/subscribeToResult'; export class MergeAllOperator implements Operator { constructor(private concurrent: number) { @@ -14,10 +16,11 @@ export class MergeAllOperator implements Operator { } } -export class MergeAllSubscriber extends Subscriber { +export class MergeAllSubscriber extends OuterSubscriber { private hasCompleted: boolean = false; private buffer: Observable[] = []; private active: number = 0; + constructor(destination: Observer, private concurrent:number) { super(destination); } @@ -28,7 +31,7 @@ export class MergeAllSubscriber extends Subscriber { this.destination.next(observable.value); } else { this.active++; - this.add(observable.subscribe(new MergeAllInnerSubscriber(this.destination, this))) + this.add(subscribeToResult(this, observable)); } } else { this.buffer.push(observable); @@ -52,14 +55,4 @@ export class MergeAllSubscriber extends Subscriber { this.destination.complete(); } } -} - -export class MergeAllInnerSubscriber extends Subscriber { - constructor(destination: Observer, private parent: MergeAllSubscriber) { - super(destination); - } - - _complete() { - this.parent.notifyComplete(this); - } } \ No newline at end of file