Skip to content

Commit

Permalink
refactor(observeOn): let observeOn uses notification instead of custo…
Browse files Browse the repository at this point in the history
…m implementation
  • Loading branch information
kwonoj committed Sep 1, 2015
1 parent 03853ac commit d86f276
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions src/operators/observeOn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Observer from '../Observer';
import Scheduler from '../Scheduler';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Notification from '../Notification';

export default function observeOn<T>(scheduler: Scheduler, delay: number = 0): Observable<T> {
return this.lift(new ObserveOnOperator(scheduler, delay));
Expand All @@ -25,8 +26,8 @@ export class ObserveOnOperator<T, R> implements Operator<T, R> {

export class ObserveOnSubscriber<T> extends Subscriber<T> {

static dispatch({ type, value, destination }) {
destination[type](value);
static dispatch({ notification, destination }) {
notification.observe(destination);
}

delay: number;
Expand All @@ -40,33 +41,30 @@ export class ObserveOnSubscriber<T> extends Subscriber<T> {

_next(x) {
this.add(this.scheduler.schedule(this.delay,
new ScheduledNotification("next", x, this.destination),
new ObserveOnMessage(Notification.createNext(x), this.destination),
ObserveOnSubscriber.dispatch)
);
}

_error(e) {
this.add(this.scheduler.schedule(this.delay,
new ScheduledNotification("error", e, this.destination),
new ObserveOnMessage(Notification.createError(e), this.destination),
ObserveOnSubscriber.dispatch));
}

_complete() {
this.add(this.scheduler.schedule(this.delay,
new ScheduledNotification("complete", void 0, this.destination),
new ObserveOnMessage(Notification.createComplete(), this.destination),
ObserveOnSubscriber.dispatch));
}
}

class ScheduledNotification {

type: string;
value: any;
class ObserveOnMessage {
notification: Notification<any>;
destination: Observer<any>;

constructor(type: string, value: any, destination: Observer<any>) {
this.type = type;
this.value = value;

constructor(notification: Notification<any>, destination:Observer<any>) {
this.notification = notification;
this.destination = destination;
}
}

0 comments on commit d86f276

Please sign in to comment.