diff --git a/spec/observables/pairs-spec.ts b/spec/observables/pairs-spec.ts new file mode 100644 index 0000000000..9c22172bda --- /dev/null +++ b/spec/observables/pairs-spec.ts @@ -0,0 +1,43 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; + +declare const {hot, asDiagram, expectObservable, expectSubscriptions}; +declare const rxTestScheduler: Rx.TestScheduler; +const Observable = Rx.Observable; + +describe('Observable.pairs', () => { + asDiagram('pairs({a: 1, b:2})')('should create an observable emits key-value pair', () => { + const e1 = Observable.pairs({a: 1, b: 2}, rxTestScheduler); + const expected = '(ab|)'; + const values = { + a: ['a', 1], + b: ['b', 2] + }; + + expectObservable(e1).toBe(expected, values); + }); + + it('should create an observable without scheduler', (done: MochaDone) => { + let expected = [ + ['a', 1], + ['b', 2], + ['c', 3] + ]; + + Observable.pairs({a: 1, b: 2, c: 3}).subscribe(x => { + expect(x).to.deep.equal(expected.shift()); + }, x => { + done(new Error('should not be called')); + }, () => { + expect(expected).to.be.empty; + done(); + }); + }); + + it('should work with empty object', () => { + const e1 = Observable.pairs({}, rxTestScheduler); + const expected = '|'; + + expectObservable(e1).toBe(expected); + }); +}); \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index 3339c7865b..0ed29621ae 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -27,6 +27,7 @@ import './add/observable/race'; import './add/observable/never'; import './add/observable/of'; import './add/observable/onErrorResumeNext'; +import './add/observable/pairs'; import './add/observable/range'; import './add/observable/using'; import './add/observable/throw'; diff --git a/src/add/observable/pairs.ts b/src/add/observable/pairs.ts new file mode 100644 index 0000000000..c344306d20 --- /dev/null +++ b/src/add/observable/pairs.ts @@ -0,0 +1,10 @@ +import {Observable} from '../../Observable'; +import {pairs as staticPairs} from '../../observable/pairs'; + +Observable.pairs = staticPairs; + +declare module '../../Observable' { + namespace Observable { + export let pairs: typeof staticPairs; + } +} \ No newline at end of file diff --git a/src/observable/PairsObservable.ts b/src/observable/PairsObservable.ts new file mode 100644 index 0000000000..f2ebf76756 --- /dev/null +++ b/src/observable/PairsObservable.ts @@ -0,0 +1,94 @@ +import {Scheduler} from '../Scheduler'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {TeardownLogic} from '../Subscription'; + +interface PairsContext { + obj: Object; + keys: Array; + length: number; + index: number; + subscriber: Subscriber>; +} + +function dispatch(state: PairsContext) { + const {obj, keys, length, index, subscriber} = state; + + if (index === length) { + subscriber.complete(); + return; + } + + const key = keys[index]; + subscriber.next([key, obj[key]]); + + state.index = index + 1; + + ( this).schedule(state); +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @extends {Ignored} + * @hide true + */ +export class PairsObservable extends Observable> { + private keys: Array; + + /** + * Convert an object into an observable sequence of [key, value] pairs + * using an optional Scheduler to enumerate the object. + * + * @example Converts a javascript object to an Observable + * var obj = { + * foo: 42, + * bar: 56, + * baz: 78 + * }; + * + * var source = Rx.Observable.pairs(obj); + * + * var subscription = source.subscribe( + * function (x) { + * console.log('Next: %s', x); + * }, + * function (err) { + * console.log('Error: %s', err); + * }, + * function () { + * console.log('Completed'); + * }); + * + * @param {Object} obj The object to inspect and turn into an + * Observable sequence. + * @param {Scheduler} [scheduler] An optional Scheduler to run the + * enumeration of the input sequence on. + * @returns {(Observable>)} An observable sequence of + * [key, value] pairs from the object. + */ + static create(obj: Object, scheduler?: Scheduler): Observable> { + return new PairsObservable(obj, scheduler); + } + + constructor(private obj: Object, private scheduler?: Scheduler) { + super(); + this.keys = Object.keys(obj); + } + + protected _subscribe(subscriber: Subscriber>): TeardownLogic { + const {keys, scheduler} = this; + const length = keys.length; + + if (scheduler) { + return scheduler.schedule(dispatch, 0, { + obj: this.obj, keys, length, index: 0, subscriber + }); + } else { + for (let idx = 0; idx < length; idx++) { + const key = keys[idx]; + subscriber.next([key, this.obj[key]]); + } + subscriber.complete(); + } + } +} \ No newline at end of file diff --git a/src/observable/pairs.ts b/src/observable/pairs.ts new file mode 100644 index 0000000000..96e543b876 --- /dev/null +++ b/src/observable/pairs.ts @@ -0,0 +1,3 @@ +import { PairsObservable } from './PairsObservable'; + +export const pairs = PairsObservable.create; \ No newline at end of file