Skip to content

Commit

Permalink
feat(timestamp): add higher-order lettable version of timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jul 13, 2017
1 parent 82480cf commit a780bf2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
export {TimeoutError} from './util/TimeoutError';
export {UnsubscriptionError} from './util/UnsubscriptionError';
export {TimeInterval} from './operator/timeInterval';
export {Timestamp} from './operator/timestamp';
export {Timestamp} from './operators/timestamp';
export {TestScheduler} from './testing/TestScheduler';
export {VirtualTimeScheduler} from './scheduler/VirtualTimeScheduler';
export {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observable/dom/AjaxObservable';
Expand Down
33 changes: 3 additions & 30 deletions src/operator/timestamp.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,14 @@
import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { IScheduler } from '../Scheduler';
import { async } from '../scheduler/async';

import { timestamp as higherOrder } from '../operators';
import { Timestamp } from '../operators/timestamp';
/**
* @param scheduler
* @return {Observable<Timestamp<any>>|WebSocketSubject<T>|Observable<T>}
* @method timestamp
* @owner Observable
*/
export function timestamp<T>(this: Observable<T>, scheduler: IScheduler = async): Observable<Timestamp<T>> {
return this.lift(new TimestampOperator(scheduler));
}

export class Timestamp<T> {
constructor(public value: T, public timestamp: number) {
}
};

class TimestampOperator<T> implements Operator<T, Timestamp<T>> {
constructor(private scheduler: IScheduler) {
}

call(observer: Subscriber<Timestamp<T>>, source: any): any {
return source.subscribe(new TimestampSubscriber(observer, this.scheduler));
}
}

class TimestampSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<Timestamp<T>>, private scheduler: IScheduler) {
super(destination);
}

protected _next(value: T): void {
const now = this.scheduler.now();

this.destination.next(new Timestamp(value, now));
}
return higherOrder(scheduler)(this);
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export { switchAll } from './switchAll';
export { switchMap } from './switchMap';
export { takeLast } from './takeLast';
export { tap } from './tap';
export { timestamp } from './timestamp';
export { toArray } from './toArray';
export { window } from './window';
export { windowCount } from './windowCount';
Expand Down
21 changes: 21 additions & 0 deletions src/operators/timestamp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

import { IScheduler } from '../Scheduler';
import { async } from '../scheduler/async';
import { OperatorFunction } from '../interfaces';
import { map } from './map';

/**
* @param scheduler
* @return {Observable<Timestamp<any>>|WebSocketSubject<T>|Observable<T>}
* @method timestamp
* @owner Observable
*/
export function timestamp<T>(scheduler: IScheduler = async): OperatorFunction<T, Timestamp<T>> {
return map((value: T) => new Timestamp(value, scheduler.now()));
// return (source: Observable<T>) => source.lift(new TimestampOperator(scheduler));
}

export class Timestamp<T> {
constructor(public value: T, public timestamp: number) {
}
};

0 comments on commit a780bf2

Please sign in to comment.