From 97ff1ece7c2aef8cd04c3a653e3b219c11e82667 Mon Sep 17 00:00:00 2001 From: Andre Medeiros Date: Fri, 23 Oct 2015 14:27:45 +0300 Subject: [PATCH] feat(shareBehavior): add shareBehavior and its tests --- .../operators/sharebehavior.js | 20 +++ spec/operators/shareBehavior-spec.js | 151 ++++++++++++++++++ src/CoreOperators.ts | 1 + src/Rx.KitchenSink.ts | 3 + src/Rx.ts | 3 + src/operators/shareBehavior.ts | 7 + 6 files changed, 185 insertions(+) create mode 100644 perf/micro/immediate-scheduler/operators/sharebehavior.js create mode 100644 spec/operators/shareBehavior-spec.js create mode 100644 src/operators/shareBehavior.ts diff --git a/perf/micro/immediate-scheduler/operators/sharebehavior.js b/perf/micro/immediate-scheduler/operators/sharebehavior.js new file mode 100644 index 0000000000..b0c3779128 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/sharebehavior.js @@ -0,0 +1,20 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldShareBehaviorWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate) + .shareValue(0); + var newShareBehaviorWithImmediateScheduler = RxNew.Observable.range(0, 25) + .shareBehavior(0); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old shareBehavior with immediate scheduler', function () { + oldShareBehaviorWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new shareBehavior with immediate scheduler', function () { + newShareBehaviorWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/shareBehavior-spec.js b/spec/operators/shareBehavior-spec.js new file mode 100644 index 0000000000..f5a40565cc --- /dev/null +++ b/spec/operators/shareBehavior-spec.js @@ -0,0 +1,151 @@ +/* globals describe, it, expect, expectObservable, hot */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.shareBehavior', function () { + it('should share a single subscription', function () { + var subscriptionCount = 0; + var obs = new Observable(function (observer) { + subscriptionCount++; + }); + + var source = obs.shareBehavior(0); + + expect(subscriptionCount).toBe(0); + + source.subscribe(); + source.subscribe(); + source.subscribe(); + + expect(subscriptionCount).toBe(1); + }); + + it('should replay 1 event from the past to a late subscriber', function (done) { + var results1 = []; + var results2 = []; + var subscriptions = 0; + + var source = new Observable(function (observer) { + subscriptions++; + observer.next(1); + observer.next(2); + observer.next(3); + observer.next(4); + }); + + var hot = source.shareBehavior(0); + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + + hot.subscribe(function (x) { + results1.push(x); + }); + + expect(results1).toEqual([0, 1, 2, 3, 4]); + expect(results2).toEqual([]); + + hot.subscribe(function (x) { + results2.push(x); + }); + + expect(results1).toEqual([0, 1, 2, 3, 4]); + expect(results2).toEqual([4]); + expect(subscriptions).toBe(1); + done(); + }); + + it('should replay the default value if no next() ever emits', function (done) { + var results1 = []; + var results2 = []; + var subscriptions = 0; + + var source = new Observable(function (observer) { + subscriptions++; + }); + + var hot = source.shareBehavior(0); + + expect(results1).toEqual([]); + expect(results2).toEqual([]); + + hot.subscribe(function (x) { + results1.push(x); + }); + + expect(results1).toEqual([0]); + expect(results2).toEqual([]); + + hot.subscribe(function (x) { + results2.push(x); + }); + + expect(results1).toEqual([0]); + expect(results2).toEqual([0]); + expect(subscriptions).toBe(1); + done(); + }); + + it('should unsubscribe from the source as soon as no more subscribers on shared', function () { + var e1 = cold( '--a---b-c--d--e--|'); + var e1subs = '^ ! '; + var expected1 = 'x-a---b- '; + var unsub1 = ' ! '; + var expected2 = 'x-a---b-c--d- '; + var unsub2 = ' ! '; + + var shared = e1.shareBehavior('x'); + var observer1 = shared.do(); + var observer2 = shared.do(); + + expectObservable(observer1, unsub1).toBe(expected1); + expectObservable(observer2, unsub2).toBe(expected2); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should give latest value to a late observer', function () { + var e1 = cold( '--a-b---c--d--e--|'); + var e1subs = '^ ! '; + var expected1 = 'x-a-b---c--d-- '; + var unsub1 = ' ! '; + var e2 = cold( '-------x----------'); + var expected2 = ' bc--d--e-- '; + var unsub2 = ' ^ ! '; + + var shared = e1.shareBehavior('x'); + var observer1 = shared.do(); + var observer2 = e2.mergeMap(function () { return shared.do(); }); + + expectObservable(observer1, unsub1).toBe(expected1); + expectObservable(observer2, unsub2).toBe(expected2); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not change the output of the observable when successful', function () { + var e1 = hot('---a--^--b-c--d--e--|'); + var expected = 'x--b-c--d--e--|'; + + expectObservable(e1.shareBehavior('x')).toBe(expected); + }); + + it('should not change the output of the observable when error', function () { + var e1 = hot('---a--^--b-c--d--e--#'); + var expected = 'x--b-c--d--e--#'; + + expectObservable(e1.shareBehavior('x')).toBe(expected); + }); + + it('should not change the output of the observable when never', function () { + var e1 = cold( '----'); + var expected = 'a---'; + + expectObservable(e1.shareBehavior('a')).toBe(expected); + }); + + it('should not change the output of the observable when empty', function () { + var e1 = cold( '| '); + var expected = '(a|)'; + + expectObservable(e1.shareBehavior('a')).toBe(expected); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index 09ffda5a0c..8280955dcf 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -56,6 +56,7 @@ export interface CoreOperators { sampleTime?: (delay: number, scheduler?: Scheduler) => Observable; scan?: (project: (acc: R, x: T) => R, acc?: R) => Observable; share?: () => Observable; + shareBehavior?: (value: any) => Observable; shareReplay?: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => Observable; single?: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable; skip?: (count: number) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 4acb6f7931..d8eeee3da8 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -233,6 +233,9 @@ observableProto.scan = scan; import share from './operators/share'; observableProto.share = share; +import shareBehavior from './operators/shareBehavior'; +observableProto.shareBehavior = shareBehavior; + import shareReplay from './operators/shareReplay'; observableProto.shareReplay = shareReplay; diff --git a/src/Rx.ts b/src/Rx.ts index 2b0027a53d..fe6ecbab4a 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -207,6 +207,9 @@ observableProto.scan = scan; import share from './operators/share'; observableProto.share = share; +import shareBehavior from './operators/shareBehavior'; +observableProto.shareBehavior = shareBehavior; + import shareReplay from './operators/shareReplay'; observableProto.shareReplay = shareReplay; diff --git a/src/operators/shareBehavior.ts b/src/operators/shareBehavior.ts new file mode 100644 index 0000000000..7eba4f47c7 --- /dev/null +++ b/src/operators/shareBehavior.ts @@ -0,0 +1,7 @@ +import Observable from '../Observable'; +import Scheduler from '../Scheduler'; +import publishBehavior from './publishBehavior'; + +export default function shareBehavior(value: T): Observable { + return publishBehavior.call(this, value).refCount(); +} \ No newline at end of file