Skip to content

Commit

Permalink
fix(timeout): Cancels scheduled timeout, if no longer needed
Browse files Browse the repository at this point in the history
* fix(timeout): Cancels scheduled timeout, if no longer needed

fixes #2134

* fix(timeoutWith): Cancels scheduled timeout, if no longer needed

* build(npm-scripts): update debug_mocha npm script for node 6

* fix(VirtualAction): Block rescheduled VirtualActions from executing their scheduled work.

VirtualActions are immutable so they can be inspected by the TestScheduler. In order to mirror

rescheduled stateful Actions, rescheduled VirtualActions shouldn't execute if they've been

rescheduled before execution.

* fix(timeout): Update timeout and timeoutWith to recycle their scheduled timeout actions.

The timeout and timeoutWith operators should dispose their scheduled timeout actions on

unsubscription. Also, given the new scheduling architecture, they can recycle their scheduled

actions so just one action is allocated per subscription.

* test(timeout): Add types to timeout and timeoutWith specs

* Fix merge conflicts

* Fix timeoutWith to work with new Subscriber leak fix.

* fix(timeout-spec): fix merge conflicts

* fix(Subscription): fold ChildSubscription logic into Subscriber to prevent operators from leaking ChildSubscriptions. (#2360)

The addition of ChildSubscription to fix #2244 accidentally introduced a different memory leak. Most operators that add and remove inner Subscriptions store the inner Subscriber instance, not the value returned by Subscription#add. When they try to remove the inner Subscription manually, nothing is removed, because the ChildSubscription wrapper instance is the one added to the subscriptions list.

Fixes #2355

* chore(publish): 5.1.1

* Ignore coverage

It's 5.5mb that people installing this don't need :)

* feat(AjaxObservable) : support 'PATCH' request type

Add support of the 'PATCH' request type based on the already existing 'PUT' request.

* fix(subscribeToResult): accept array-like as result

Accept array-like as a result to subscribe, so that
Observable.from and operators using subscribeToResult
have identical behaviour.

* chore(ajax.patch): Adds test for ajax.patch

* fix(forkJoin): add type signature for single observable with selector

Add type signature for using forkJoin with single observable as first parameter
and selector function as second parameter, so that typings would not prevent
usage which is permitted and properly handled by operator.

Closes #2347

* feat(webSocket): Add binaryType to config object

Add binaryType to config object, so that it is possible
to set that parameter on underlying socket before any
data emits happen.

Closes #2353

* fix(merge): return Observable when called with single lowerCaseO

Return Observable when merge is called with single lower case observable,
so that merge would always return Observable instance.

* fix(bindNodeCallback): emit undefined when callback has no success arguments

Emit undefined insteady of empty array by resulting Observable,
when callback function is called without success parameters.

Closes #2254

* chore(danger): update dangerfile to validate commit message

* chore(*): correctly scope disabled `max-line-length` tslint rule

The max line length is set to 150 in 'tslint.json'. In specific regions, it is
desirable to allow longer lines, so these regions should be wrapped in comments
like the following:

```js
// Max line length enforced here.

/* tslint:disable:max-line-length */
// Max line length NOT enforced here.
/* tslint:enable:max-line-length */   <-- CORRECT

// Max line length enforced here.
```

In many cases, the re-enabling comment incorrectly included `disable` instead of
`enable` (as shown below), which essentially keeps the `max-line-length`
**disabled** for the rest of the file:

```js
// Max line length enforced here.

/* tslint:disable:max-line-length */
// Max line length NOT enforced here.
/* tslint:disable:max-line-length */   <-- INCORRECT

// Max line length NOT enforced here.
```

This commit fixes these comments, so the `max-line-length` rule is properly
enforced in regions that don't need longer lines.

* fix(bindCallback): emit undefined when callback is without arguments

In resulting Observable emit undefined when callback is called without
parameters, instead of emitting empty array.

* fix(mergeAll): introduce variant support <T, R> for mergeMap

- closes #2372

* feat(windowTime): maxWindowSize parameter in windowTime operator

Adds new parameter in windowTime operator to control how much values given
window can emit.

Closes #1301

* docs(ObservableInput): add ObservableInput and SubscribableOrPromise descriptions

Add ObservableInput and SubscribableOrPromise interface descriptions,
as well as link these interfaces in type descriptions of operators,
so that users always know what kind of parameters they can pass to
used methods.

* fix(timeoutWith): update timeoutWith to work with new Subscriber leak fix changes
  • Loading branch information
jayphelps authored and benlesh committed Mar 27, 2017
1 parent 3ced24f commit 3e9d529
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 82 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js",
"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
Expand Down
28 changes: 26 additions & 2 deletions spec/operators/timeout-spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import * as Rx from '../../dist/cjs/Rx';
import { expect } from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const rxTestScheduler: Rx.TestScheduler;
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

/** @test {timeout} */
Expand Down Expand Up @@ -121,4 +121,28 @@ describe('Observable.prototype.timeout', () => {
expectObservable(result).toBe(expected, values, defaultTimeoutError);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('--a--b--c---d--e--|');
const e1subs = '^ ! ';
const expected = '--a--b--c-- ';
const unsub = ' ! ';

const result = e1
.lift({
call: (timeoutSubscriber, source) => {
const { action } = <any> timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source.subscribe(timeoutSubscriber);
}
})
.timeout(50, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
29 changes: 28 additions & 1 deletion spec/operators/timeoutWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as Rx from '../../dist/cjs/Rx';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
declare const rxTestScheduler: Rx.TestScheduler;
declare const hot: typeof marbleTestingSignature.hot;
declare const cold: typeof marbleTestingSignature.cold;
declare const expectObservable: typeof marbleTestingSignature.expectObservable;
declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions;

declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

/** @test {timeoutWith} */
Expand Down Expand Up @@ -266,4 +266,31 @@ describe('Observable.prototype.timeoutWith', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('---a---b-----c----|');
const e1subs = '^ ! ';
const e2 = cold( '-x---y| ');
const e2subs = ' ^ ! ';
const expected = '---a---b----x-- ';
const unsub = ' ! ';

const result = e1
.lift({
call: (timeoutSubscriber, source) => {
const { action } = <any> timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source.subscribe(timeoutSubscriber);
}
})
.timeoutWith(40, e2, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
13 changes: 12 additions & 1 deletion spec/schedulers/VirtualTimeScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => {
v.flush();
expect(count).to.equal(3);
});
});

it('should not execute virtual actions that have been rescheduled before flush', () => {
const v = new VirtualTimeScheduler();
let messages = [];
let action: VirtualAction<string> = <VirtualAction<string>> v.schedule(function(state: string) {
messages.push(state);
}, 10, 'first message');
action = <VirtualAction<string>> action.schedule('second message' , 10);
v.flush();
expect(messages).to.deep.equal(['second message']);
});
});
56 changes: 23 additions & 33 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Action } from '../scheduler/Action';
import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
Expand Down Expand Up @@ -42,15 +43,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

private action: Action<TimeoutSubscriber<T>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
Expand All @@ -61,40 +55,36 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.notifyTimeout();
}
private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
subscriber.error(subscriber.errorInstance);
}

private scheduleTimeout(): void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T): void {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout(): void {
this.error(this.errorInstance);
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.errorInstance = null;
}
}
72 changes: 29 additions & 43 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Action } from '../scheduler/Action';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { IScheduler } from '../Scheduler';
import { async } from '../scheduler/async';
import { Subscription, TeardownLogic } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Observable, ObservableInput } from '../Observable';
import { isDate } from '../util/isDate';
import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -49,65 +50,50 @@ class TimeoutWithOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
private timeoutSubscription: Subscription = undefined;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

constructor(public destination: Subscriber<T>,
private action: Action<TimeoutWithSubscriber<T, R>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private withObservable: ObservableInput<any>,
private scheduler: IScheduler) {
super();
destination.add(this);
super(destination);
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.handleTimeout();
}
private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
const { withObservable } = subscriber;
(<any> subscriber)._unsubscribeAndRecycle();
subscriber.add(subscribeToResult(subscriber, withObservable));
}

private scheduleTimeout(): void {
let currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };
this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState);
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T) {
this.destination.next(value);
protected _next(value: T): void {
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any) {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete() {
this.destination.complete();
this._hasCompleted = true;
}

handleTimeout(): void {
if (!this.closed) {
const withObservable = this.withObservable;
this.unsubscribe();
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
}
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.withObservable = null;
}
}
10 changes: 9 additions & 1 deletion src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler {
*/
export class VirtualAction<T> extends AsyncAction<T> {

protected active: boolean = true;

constructor(protected scheduler: VirtualTimeScheduler,
protected work: (this: VirtualAction<T>, state?: T) => void,
protected index: number = scheduler.index += 1) {
Expand All @@ -57,7 +59,7 @@ export class VirtualAction<T> extends AsyncAction<T> {
if (!this.id) {
return super.schedule(state, delay);
}

this.active = false;
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
Expand All @@ -79,6 +81,12 @@ export class VirtualAction<T> extends AsyncAction<T> {
return undefined;
}

protected _execute(state: T, delay: number): any {
if (this.active === true) {
return super._execute(state, delay);
}
}

public static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>) {
if (a.delay === b.delay) {
if (a.index === b.index) {
Expand Down

0 comments on commit 3e9d529

Please sign in to comment.