From 1b89298c050c44d3765c0e32cc7a8140a3d0da57 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Sat, 19 Mar 2016 10:24:08 -0700 Subject: [PATCH] fix(bufferToggle): accepts closing selector returns promise relates to #1246 --- spec/operators/bufferToggle-spec.ts | 39 ++++++++++++ src/operator/bufferToggle.ts | 96 ++++++++++++++--------------- 2 files changed, 87 insertions(+), 48 deletions(-) diff --git a/spec/operators/bufferToggle-spec.ts b/spec/operators/bufferToggle-spec.ts index d8cf165cc6c..4d74308e3f2 100644 --- a/spec/operators/bufferToggle-spec.ts +++ b/spec/operators/bufferToggle-spec.ts @@ -1,4 +1,5 @@ import * as Rx from '../../dist/cjs/Rx'; +import {DoneSignature} from '../helpers/test-helper'; declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; const Observable = Rx.Observable; @@ -341,4 +342,42 @@ describe('Observable.prototype.bufferToggle', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should accept closing selector returns promise resolves', (done: DoneSignature) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + const expected = [[1]]; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any) => { resolve(42); })) + .subscribe((x) => { + expect(x).toEqual(expected.shift()); }, + done.fail, + () => { + expect(expected.length).toBe(0); + done(); + }); + }); + + it('should accept closing selector returns promise rejects', (done: DoneSignature) => { + const e1 = Observable.concat(Observable.of(1), + Observable.timer(10).mapTo(2), + Observable.timer(10).mapTo(3), + Observable.timer(100).mapTo(4) + ); + + const expected = 42; + + e1.bufferToggle(Observable.of(10), () => new Promise((resolve: any, reject: any) => { reject(expected); })) + .subscribe((x) => { + done.fail(); + }, (x) => { + expect(x).toBe(expected); + done(); + }, () => { + done.fail(); + }); + }); }); \ No newline at end of file diff --git a/src/operator/bufferToggle.ts b/src/operator/bufferToggle.ts index 55328b736c8..7ea51c32253 100644 --- a/src/operator/bufferToggle.ts +++ b/src/operator/bufferToggle.ts @@ -1,9 +1,11 @@ import {Operator} from '../Operator'; import {Subscriber} from '../Subscriber'; -import {Observable} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscription} from '../Subscription'; -import {tryCatch} from '../util/tryCatch'; -import {errorObject} from '../util/errorObject'; + +import {subscribeToResult} from '../util/subscribeToResult'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {InnerSubscriber} from '../InnerSubscriber'; /** * Buffers values from the source by opening the buffer via signals from an @@ -22,18 +24,18 @@ import {errorObject} from '../util/errorObject'; * @owner Observable */ export function bufferToggle(openings: Observable, - closingSelector: (value: O) => Observable): Observable { + closingSelector: (value: O) => SubscribableOrPromise | void): Observable { return this.lift(new BufferToggleOperator(openings, closingSelector)); } export interface BufferToggleSignature { - (openings: Observable, closingSelector: (value: O) => Observable): Observable; + (openings: Observable, closingSelector: (value: O) => SubscribableOrPromise | void): Observable; } class BufferToggleOperator implements Operator { constructor(private openings: Observable, - private closingSelector: (value: O) => Observable) { + private closingSelector: (value: O) => SubscribableOrPromise | void) { } call(subscriber: Subscriber): Subscriber { @@ -46,17 +48,17 @@ interface BufferContext { subscription: Subscription; } -class BufferToggleSubscriber extends Subscriber { +class BufferToggleSubscriber extends OuterSubscriber { private contexts: Array> = []; constructor(destination: Subscriber, private openings: Observable, - private closingSelector: (value: O) => Observable) { + private closingSelector: (value: O) => SubscribableOrPromise | void) { super(destination); this.add(this.openings.subscribe(new BufferToggleOpeningsSubscriber(this))); } - protected _next(value: T) { + protected _next(value: T): void { const contexts = this.contexts; const len = contexts.length; for (let i = 0; i < len; i++) { @@ -64,7 +66,7 @@ class BufferToggleSubscriber extends Subscriber { } } - protected _error(err: any) { + protected _error(err: any): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -76,7 +78,7 @@ class BufferToggleSubscriber extends Subscriber { super._error(err); } - protected _complete() { + protected _complete(): void { const contexts = this.contexts; while (contexts.length > 0) { const context = contexts.shift(); @@ -89,27 +91,29 @@ class BufferToggleSubscriber extends Subscriber { super._complete(); } - openBuffer(value: O) { - const closingSelector = this.closingSelector; - const contexts = this.contexts; - - let closingNotifier = tryCatch(closingSelector)(value); - if (closingNotifier === errorObject) { - this._error(errorObject.e); - } else { - let context = { - buffer: [], - subscription: new Subscription() - }; - contexts.push(context); - const subscriber = new BufferToggleClosingsSubscriber(this, context); - const subscription = closingNotifier.subscribe(subscriber); - context.subscription.add(subscription); - this.add(subscription); + openBuffer(value: O): void { + try { + const closingSelector = this.closingSelector; + const closingNotifier = closingSelector.call(this, value); + if (closingNotifier) { + this.trySubscribe(closingNotifier); + } + } catch (err) { + this._error(err); } } - closeBuffer(context: BufferContext) { + notifyNext(outerValue: any, innerValue: O, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.closeBuffer(outerValue); + } + + notifyComplete(innerSub: InnerSubscriber): void { + this.closeBuffer(( innerSub).context); + } + + private closeBuffer(context: BufferContext): void { const contexts = this.contexts; if (contexts === null) { return; @@ -120,34 +124,30 @@ class BufferToggleSubscriber extends Subscriber { this.remove(subscription); subscription.unsubscribe(); } -} -class BufferToggleOpeningsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber) { - super(null); - } + private trySubscribe(closingNotifier: any): void { + const contexts = this.contexts; - protected _next(value: O) { - this.parent.openBuffer(value); - } + const buffer: Array = []; + const subscription = new Subscription(); + const context = { buffer, subscription }; + contexts.push(context); - protected _error(err: any) { - this.parent.error(err); - } + const innerSubscription = subscribeToResult(this, closingNotifier, context); + ( innerSubscription).context = context; - protected _complete() { - // noop + this.add(innerSubscription); + subscription.add(innerSubscription); } } -class BufferToggleClosingsSubscriber extends Subscriber { - constructor(private parent: BufferToggleSubscriber, - private context: { subscription: any, buffer: T[] }) { +class BufferToggleOpeningsSubscriber extends Subscriber { + constructor(private parent: BufferToggleSubscriber) { super(null); } - protected _next() { - this.parent.closeBuffer(this.context); + protected _next(value: O) { + this.parent.openBuffer(value); } protected _error(err: any) { @@ -155,6 +155,6 @@ class BufferToggleClosingsSubscriber extends Subscriber { } protected _complete() { - this.parent.closeBuffer(this.context); + // noop } }