Skip to content

Commit

Permalink
fix(windowToggle): fix window closing and unsubscription semantics
Browse files Browse the repository at this point in the history
Fix windowToggle in order to unsubscribe inner windows whenever the outer
Observable is unsubscribed. Also allow windows to be closed when the
closingNotifier Observables complete without having emitted a 'next'
value previously. Also only open a new window if the closingSelector
hasn't thrown an error.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 16, 2015
1 parent 19cc264 commit 0cb21e6
Showing 1 changed file with 31 additions and 25 deletions.
56 changes: 31 additions & 25 deletions src/operators/windowToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ class WindowToggleOperator<T, R, O> implements Operator<T, R> {
}
}

interface WindowContext<T> {
window: Subject<T>;
subscription: Subscription<T>;
}

class WindowToggleSubscriber<T, O> extends Subscriber<T> {
private windows: Subject<T>[] = [];
private contexts: Array<WindowContext<T>> = [];
private closingNotification: Subscription<any>;

constructor(destination: Subscriber<T>,
Expand All @@ -39,55 +44,56 @@ class WindowToggleSubscriber<T, O> extends Subscriber<T> {
}

_next(value: T) {
const windows = this.windows;
const len = windows.length;
const contexts = this.contexts;
const len = contexts.length;
for (let i = 0; i < len; i++) {
windows[i].next(value);
contexts[i].window.next(value);
}
}

_error(err: any) {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().error(err);
const contexts = this.contexts;
while (contexts.length > 0) {
contexts.shift().window.error(err);
}
this.destination.error(err);
}

_complete() {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().complete();
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
context.window.complete();
context.subscription.unsubscribe();
}
this.destination.complete();
}

openWindow(value: O) {
const window = new Subject<T>();
this.windows.push(window);
this.destination.next(window);
let windowContext = {
window,
subscription: new Subscription()
};

const closingSelector = this.closingSelector;
let closingNotifier = tryCatch(closingSelector)(value);
if (closingNotifier === errorObject) {
this.error(closingNotifier.e);
} else {
const subscriber = new WindowClosingNotifierSubscriber<T, O>(this, windowContext);
let context = {
window: new Subject<T>(),
subscription: new Subscription()
};
this.contexts.push(context);
this.destination.next(context.window);
const subscriber = new WindowClosingNotifierSubscriber<T, O>(this, context);
const subscription = closingNotifier._subscribe(subscriber);
this.add(windowContext.subscription.add(subscription));
this.add(context.subscription.add(subscription));
}
}

closeWindow(windowContext: { subscription: Subscription<T>, window: Subject<T> }) {
const { window, subscription } = windowContext;
const windows = this.windows;
windows.splice(windows.indexOf(window), 1);
closeWindow(context: WindowContext<T>) {
const { window, subscription } = context;
const contexts = this.contexts;
contexts.splice(contexts.indexOf(context), 1);
window.complete();
this.remove(subscription);
subscription.unsubscribe();
}
}

Expand All @@ -106,7 +112,7 @@ class WindowClosingNotifierSubscriber<T, O> extends Subscriber<T> {
}

_complete() {
// noop
this.parent.closeWindow(this.windowContext);
}
}

Expand Down

0 comments on commit 0cb21e6

Please sign in to comment.