Skip to content

Commit

Permalink
fix(bufferToggle): fix disposal of subscriptions when errors occur
Browse files Browse the repository at this point in the history
Fix bufferToggle operator to appropriately unsubscribe whenever an error happens, either on the
source or on the opening or closing observables.
  • Loading branch information
staltz authored and benlesh committed Dec 2, 2015
1 parent fe1ba5d commit a20325c
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions src/operator/bufferToggle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
}

_error(err: any) {
const contexts = this.contexts;
while (contexts.length > 0) {
const context = contexts.shift();
context.subscription.unsubscribe();
context.buffer = null;
context.subscription = null;
}
this.contexts = null;
this.destination.error(err);
}
Expand All @@ -64,7 +71,9 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
this.destination.next(context.buffer);
context.subscription.unsubscribe();
context.buffer = null;
context.subscription = null;
}
this.contexts = null;
this.destination.complete();
}

Expand All @@ -74,16 +83,14 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {

let closingNotifier = tryCatch(closingSelector)(value);
if (closingNotifier === errorObject) {
const err = closingNotifier.e;
this.contexts = null;
this.destination.error(err);
this._error(closingNotifier.e);
} else {
let context = {
buffer: [],
subscription: new Subscription()
};
contexts.push(context);
const subscriber = new BufferClosingNotifierSubscriber(this, context);
const subscriber = new BufferToggleClosingsSubscriber(this, context);
const subscription = closingNotifier._subscribe(subscriber);
this.add(context.subscription.add(subscription));
}
Expand All @@ -102,39 +109,40 @@ class BufferToggleSubscriber<T, O> extends Subscriber<T> {
}
}

class BufferClosingNotifierSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferToggleSubscriber<any, T>, private context: { subscription: any, buffer: T[] }) {
class BufferToggleOpeningsSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferToggleSubscriber<any, T>) {
super(null);
}

_next() {
this.parent.closeBuffer(this.context);
_next(value: T) {
this.parent.openBuffer(value);
}

_error(err) {
this.parent.error(err);
}

_complete() {
this.parent.closeBuffer(this.context);
// noop
}
}

class BufferToggleOpeningsSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferToggleSubscriber<any, T>) {
class BufferToggleClosingsSubscriber<T> extends Subscriber<T> {
constructor(private parent: BufferToggleSubscriber<any, T>,
private context: { subscription: any, buffer: T[] }) {
super(null);
}

_next(value: T) {
this.parent.openBuffer(value);
_next() {
this.parent.closeBuffer(this.context);
}

_error(err) {
this.parent.error(err);
}

_complete() {
// noop
this.parent.closeBuffer(this.context);
}
}

Expand Down

0 comments on commit a20325c

Please sign in to comment.