diff --git a/package.json b/package.json index 51905914..3df432fe 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ }, "license": "MIT", "devDependencies": { + "@reactivex/rxjs": "^5.0.0-alpha.1", "babel-core": "5.8.23", "babel-loader": "5.3.2", "baconjs": "0.7.71", @@ -57,6 +58,7 @@ "transducers-js": "0.4.174", "transducers.js": "0.3.2", "webpack": "1.12.1", - "webpack-dev-server": "1.10.1" + "webpack-dev-server": "1.10.1", + "zen-observable": "^0.1.3" } } diff --git a/src/observable.js b/src/observable.js index 5170465c..7fcb8c54 100644 --- a/src/observable.js +++ b/src/observable.js @@ -101,6 +101,22 @@ extend(Observable.prototype, { return this._on(ANY, fn); }, + subscribe(observer) { + let stream = this.takeErrors(1); + let fn = function(event) { + if (event.type === "value" && observer.next) { + observer.next(event.value); + } else if (event.type == "error" && observer.error) { + observer.error(event.value); + } else if (event.type === "end" && observer.complete) { + observer.complete(event.value); + } + } + + stream.onAny(fn); + return () => stream.offAny(fn); + }, + offValue(fn) { return this._off(VALUE, fn); }, @@ -166,6 +182,7 @@ extend(Observable.prototype, { }); +Observable.prototype[require('./utils/symbol')('observable')] = function() { return this; } // extend() can't handle `toString` in IE8 Observable.prototype.toString = function() { diff --git a/src/utils/symbol.js b/src/utils/symbol.js new file mode 100644 index 00000000..d93988b4 --- /dev/null +++ b/src/utils/symbol.js @@ -0,0 +1,9 @@ +module.exports = function(key) { + if (typeof Symbol !== 'undefined' && Symbol[key]) { + return Symbol[key]; + } else if (typeof Symbol !== 'undefined' && typeof Symbol.for === 'function') { + return Symbol.for(key); + } else { + return '@@' + key; + } +} diff --git a/test/specs/es-observable.coffee b/test/specs/es-observable.coffee new file mode 100644 index 00000000..b9eab419 --- /dev/null +++ b/test/specs/es-observable.coffee @@ -0,0 +1,24 @@ +Observable = require('zen-observable') +{stream, prop, send, Kefir} = require('../test-helpers.coffee') + +describe '[Symbol.observable]', -> + it 'outputs a compatible Observable', (done) -> + a = stream() + values = [] + observable = Observable.from(a) + observable.subscribe + next: (x) -> + values.push(x) + complete: (x) -> + expect(values).toEqual([1, 2, 3]) + done() + send(a, [1, 2, 3, '']) + + it 'unsubscribes stream after an error', -> + a = stream() + values = [] + observable = a[Symbol.observable]() + observable.subscribe(next: (x) -> values.push(x)) + + send(a, [1, {error: 2}, 3]) + expect(values).toEqual([1]) diff --git a/test/test-helpers.coffee b/test/test-helpers.coffee index e15e0685..ed36f9e7 100644 --- a/test/test-helpers.coffee +++ b/test/test-helpers.coffee @@ -1,3 +1,4 @@ +Observable = require('zen-observable') # Activate Symbol polyfill Kefir = require("../dist/kefir") sinon = require('sinon') @@ -255,4 +256,3 @@ beforeEach -> return !@isNot } -