Skip to content

Commit

Permalink
Enable consuming ES7 observables as streams
Browse files Browse the repository at this point in the history
  • Loading branch information
lautis committed Sep 26, 2015
1 parent 068bf15 commit 0a038ea
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ Kefir.fromEvents = require('./primary/from-events');
// (Function) -> Stream
Kefir.stream = require('./primary/stream');



// (ES Observable) -> Stream
Kefir.from = require('./primary/from');

// Create a property
// -----------------------------------------------------------------------------
Expand Down
26 changes: 26 additions & 0 deletions src/primary/from.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const stream = require('./stream');
const symbol = require('../utils/symbol')('observable');

module.exports = function fromObservable(_observable) {
const observable = _observable[symbol] ? _observable[symbol]() : _observable;
return stream(function(emitter) {
const unsub = observable.subscribe({
error(error) {
emitter.error(error);
emitter.end();
},
next(value) {
emitter.emit(value);
},
complete() {
emitter.end();
}
})

if (unsub.unsubscribe) {
return function () { unsub.unsubscribe(); };
} else {
return unsub;
}
}).setName('from');
}
30 changes: 30 additions & 0 deletions test/specs/from.coffee
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
Observable = require('zen-observable')
Rx = require('@reactivex/rxjs')
{activate, deactivate, Kefir} = require('../test-helpers.coffee')

describe 'from', ->
it 'turns an ES7 observable into a stream', ->
expect(Kefir.from(Observable.of(1, 2))).toBeStream()

it 'emits events from observable to stream', (done) ->
stream = Kefir.from(Observable.of(1, 2))
values = []
stream.onValue (value) -> values.push(value)
stream.onEnd ->
expect(values).toEqual([1, 2])
done()

it 'ends stream after an error', (done) ->
observable = new Observable((observer) ->
observer.next(1)
observer.error()
)
Kefir.from(observable).onEnd -> done()

it 'turns an RxJS observable into a Kefir stream', (done) ->
stream = Kefir.from(Rx.Observable.of('hello world'))
values = []
stream.onValue (value) -> values.push(value)
stream.onEnd ->
expect(values).toEqual(['hello world'])
done()

0 comments on commit 0a038ea

Please sign in to comment.