Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(callback): Add Observable.fromCallback #729

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions spec/observables/from-callback-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.fromCallback', function () {
it('should emit one value from a callback', function (done) {
function callback (datum, cb) {
cb(datum);
}
var cbToObs = Observable.fromCallback(callback);

cbToObs(42)
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});

it('should emit one value chosen by a selector', function (done) {
function callback (datum, cb) {
cb(null, datum);
}
var cbToObs = Observable.fromCallback(callback, null, function (err, datum) { return datum; });

cbToObs(42)
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});

it('should override `this` in the callback', function (done) {
function callback (cb) {
cb(this.value);
}
var cbToObs = Observable.fromCallback(callback, {value: 42});

cbToObs()
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});

it('should emit an error when the selector throws', function (done) {
function callback (cb) {
cb(42);
}
var cbToObs = Observable.fromCallback(callback, null, function (err) { throw new Error('Yikes!'); });

cbToObs()
.subscribe(function () {
// Considered a failure if we don't go directly to err handler
done.fail('should not be called');
},
function (err) {
expect(err.message).toBe('Yikes!');
done();
},
function () {
// Considered a failure if we don't go directly to err handler
done.fail('should not be called');
}
);
});

it('should not emit, throw or complete if immediately unsubscribed', function (done) {
var nextSpy = jasmine.createSpy('next');
var throwSpy = jasmine.createSpy('throw');
var completeSpy = jasmine.createSpy('complete');
var timeout;
function callback (datum, cb) {
// Need to cb async in order for the unsub to trigger
timeout = setTimeout(function () {
cb(datum);
});
}
var subscription = Observable.fromCallback(callback)(42)
.subscribe(nextSpy, throwSpy, completeSpy);
subscription.unsubscribe();

setTimeout(function () {
expect(nextSpy).not.toHaveBeenCalled();
expect(throwSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();

clearTimeout(timeout);
done();
});
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess for now this async test is ok. When we have Subscriber start() feature, we can rework this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments to make things clearer

});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export class Observable<T> implements CoreOperators<T> {
removeHandler: (handler: Function) => void,
selector?: (...args: Array<any>) => T) => Observable<T>;
static fromPromise: <T>(promise: Promise<T>, scheduler?: Scheduler) => Observable<T>;
static fromCallback: <T>(callbackFunc: Function, ctx?: Object, selector?: Function, scheduler?: Scheduler) => Function;
static interval: (interval: number, scheduler?: Scheduler) => Observable<number>;
static merge: <T>(...observables: Array<Observable<any> | Scheduler | number>) => Observable<T>;
static never: <T>() => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ Observable.fromEventPattern = FromEventPatternObservable.create;
import {PromiseObservable} from './observables/PromiseObservable';
Observable.fromPromise = PromiseObservable.create;

import {CallbackObservable} from './observables/CallbackObservable';
Observable.fromCallback = CallbackObservable.create;

import {IntervalObservable} from './observables/IntervalObservable';
Observable.interval = IntervalObservable.create;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Observable.fromEventPattern = FromEventPatternObservable.create;
import {PromiseObservable} from './observables/PromiseObservable';
Observable.fromPromise = PromiseObservable.create;

import {CallbackObservable} from './observables/CallbackObservable';
Observable.fromCallback = CallbackObservable.create;

import {IntervalObservable} from './observables/IntervalObservable';
Observable.interval = IntervalObservable.create;

Expand Down
110 changes: 110 additions & 0 deletions src/observables/CallbackObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {immediate} from '../schedulers/immediate';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export class CallbackObservable<T> extends Observable<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about align class name to observable, such as Observable.fromEvent has implementation named FromEventObservable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to mimic PromiseObservable with regards to that sort of thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just noticed that too.


_isScalar: boolean = false;
value: T | T[];

static create<T>(callbackFunc: Function,
ctx: Object = undefined,
selector: Function = undefined,
scheduler: Scheduler = immediate): Function {
return (...args): Observable<T> => {
return new CallbackObservable(callbackFunc, ctx, selector, args, scheduler);
};
}

constructor(private callbackFunc: Function,
private ctx,
private selector,
private args: any[],
public scheduler: Scheduler = immediate) {
super();
}

_subscribe(subscriber: Subscriber<T | T[]>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is really big with lots of nested if statements. Can you refactor?

const callbackFunc = this.callbackFunc;
const ctx = this.ctx;
const selector = this.selector;
const args = this.args;
const scheduler = this.scheduler;

let handler;

if (scheduler === immediate) {
if (this._isScalar) {
subscriber.next(this.value);
subscriber.complete();
} else {
handler = (...innerArgs) => {
let results;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can move let results down to where it's being used on line 52


this._isScalar = true;
this.value = innerArgs;

if (selector) {
results = tryCatch(selector).apply(ctx, innerArgs);
if (results === errorObject) { return subscriber.error(results.e); }
subscriber.next(results);
} else {
if (innerArgs.length <= 1) {
subscriber.next(innerArgs[0]);
} else {
subscriber.next(innerArgs);
}
}
subscriber.complete();
};
}
} else {
const subscription = new Subscription();
if (this._isScalar) {
const value = this.value;
subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber }));
} else {
handler = (...innerArgs) => {
let results;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as before.


this._isScalar = true;

if (selector) {
results = tryCatch(selector).apply(ctx, innerArgs);
if (results === errorObject) {
return subscription.add(scheduler.schedule(dispatchError, 0, { err: results.e, subscriber }));
}
this.value = results;
} else {
if (innerArgs.length <= 1) {
this.value = innerArgs[0];
} else {
this.value = innerArgs;
}
}
const value = this.value;
subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber }));
};
return subscription;
}
}

if (handler) {
args.push(handler);
callbackFunc.apply(ctx, args);
}
}
}

function dispatchNext({ value, subscriber }) {
subscriber.next(value);
subscriber.complete();
}

function dispatchError({ err, subscriber }) {
subscriber.error(err);
}