diff --git a/src/index.js b/src/index.js index d5ed6a17..bb87b3f2 100644 --- a/src/index.js +++ b/src/index.js @@ -42,8 +42,8 @@ Kefir.fromEvents = require('./primary/from-events'); // (Function) -> Stream Kefir.stream = require('./primary/stream'); - - +// (ES Observable) -> Stream +Kefir.from = require('./primary/from'); // Create a property // ----------------------------------------------------------------------------- diff --git a/src/primary/from.js b/src/primary/from.js new file mode 100644 index 00000000..67b878bf --- /dev/null +++ b/src/primary/from.js @@ -0,0 +1,26 @@ +const stream = require('./stream'); +const symbol = require('../utils/symbol')('observable'); + +module.exports = function fromObservable(_observable) { + const observable = _observable[symbol] ? _observable[symbol]() : _observable; + return stream(function(emitter) { + const unsub = observable.subscribe({ + error(error) { + emitter.error(error); + emitter.end(); + }, + next(value) { + emitter.emit(value); + }, + complete() { + emitter.end(); + } + }) + + if (unsub.unsubscribe) { + return function () { unsub.unsubscribe(); }; + } else { + return unsub; + } + }).setName('from'); +} diff --git a/test/specs/from.coffee b/test/specs/from.coffee new file mode 100644 index 00000000..06535c1c --- /dev/null +++ b/test/specs/from.coffee @@ -0,0 +1,30 @@ +Observable = require('zen-observable') +Rx = require('@reactivex/rxjs') +{activate, deactivate, Kefir} = require('../test-helpers.coffee') + +describe 'from', -> + it 'turns an ES7 observable into a stream', -> + expect(Kefir.from(Observable.of(1, 2))).toBeStream() + + it 'emits events from observable to stream', (done) -> + stream = Kefir.from(Observable.of(1, 2)) + values = [] + stream.onValue (value) -> values.push(value) + stream.onEnd -> + expect(values).toEqual([1, 2]) + done() + + it 'ends stream after an error', (done) -> + observable = new Observable((observer) -> + observer.next(1) + observer.error() + ) + Kefir.from(observable).onEnd -> done() + + it 'turns an RxJS observable into a Kefir stream', (done) -> + stream = Kefir.from(Rx.Observable.of('hello world')) + values = [] + stream.onValue (value) -> values.push(value) + stream.onEnd -> + expect(values).toEqual(['hello world']) + done()