diff --git a/spec/operators/count-spec.js b/spec/operators/count-spec.js new file mode 100644 index 0000000000..aa94015eda --- /dev/null +++ b/spec/operators/count-spec.js @@ -0,0 +1,13 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('count', function () { + it('should count the values of an observable', function (done) { + Observable.fromArray([1, 2, 3]) + .count() + .subscribe(function (total) { + expect(total).toEqual(3); + }, null, done); + }); +}); \ No newline at end of file diff --git a/spec/operators/takeUntil-spec.js b/spec/operators/takeUntil-spec.js index 71758545bb..e1a1e0a141 100644 --- a/spec/operators/takeUntil-spec.js +++ b/spec/operators/takeUntil-spec.js @@ -8,8 +8,8 @@ describe('Observable.prototype.takeUntil()', function () { var i = 0; var nextSpy = jasmine.createSpy('nextSpy'); - Observable.timer(0, 16) - .takeUntil(Observable.timer(81)) + Observable.timer(0, 100) + .takeUntil(Observable.timer(450)) .subscribe(nextSpy, null, function () { expect(nextSpy.calls.count()).toBe(5); expected.forEach(function (v) { diff --git a/src/Observable.ts b/src/Observable.ts index 91131bc810..95ad2c6baf 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -129,6 +129,7 @@ export default class Observable { map: (project: (x: T, ix?: number) => R, thisArg?: any) => Observable; mapTo: (value: R) => Observable; toArray: () => Observable; + count: () => Observable; scan: (project: (acc: R, x: T) => R, acc?: R) => Observable; reduce: (project: (acc: R, x: T) => R, acc?: R) => Observable; startWith: (x: T) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 9725ae955d..a4d39261a4 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -79,6 +79,7 @@ import _do from './operators/do'; import map from './operators/map'; import mapTo from './operators/mapTo'; import toArray from './operators/toArray'; +import count from './operators/count'; import scan from './operators/scan'; import reduce from './operators/reduce'; import startWith from './operators/startWith'; @@ -87,6 +88,7 @@ observableProto.do = _do; observableProto.map = map; observableProto.mapTo = mapTo; observableProto.toArray = toArray; +observableProto.count = count; observableProto.scan = scan; observableProto.reduce = reduce; observableProto.startWith = startWith; diff --git a/src/observables/RangeObservable.ts b/src/observables/RangeObservable.ts index 601f5c227e..07fd593470 100644 --- a/src/observables/RangeObservable.ts +++ b/src/observables/RangeObservable.ts @@ -3,15 +3,15 @@ import Observable from '../Observable'; export default class RangeObservable extends Observable { - static create(start: number = 0, count: number = 0, scheduler?: Scheduler) { - return new RangeObservable(start, count, scheduler); + static create(start: number = 0, end: number = 0, scheduler?: Scheduler) { + return new RangeObservable(start, end, scheduler); } static dispatch(state) { - const { start, index, count, subscriber } = state; + const { start, index, end, subscriber } = state; - if (index >= count) { + if (index >= end) { subscriber.complete(); return; } @@ -28,24 +28,31 @@ export default class RangeObservable extends Observable { ( this).schedule(state); } - constructor(private start: number, private count: number, private scheduler?: Scheduler) { + private start: number; + private end: number; + private scheduler: Scheduler; + + constructor(start: number, end: number, scheduler?: Scheduler) { super(); + this.start = start; + this.end = end; + this.scheduler = scheduler; } _subscribe(subscriber) { let index = 0; let start = this.start; - const count = this.count; + const end = this.end; const scheduler = this.scheduler; if (scheduler) { subscriber.add(scheduler.schedule(0, { - index, count, start, subscriber + index, end, start, subscriber }, RangeObservable.dispatch)); } else { do { - if (index++ >= count) { + if (index++ >= end) { subscriber.complete(); break; } diff --git a/src/operators/count.ts b/src/operators/count.ts new file mode 100644 index 0000000000..ec9630b9d1 --- /dev/null +++ b/src/operators/count.ts @@ -0,0 +1,31 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; + +export default function count() { + return this.lift(new CountOperator()); +} + +export class CountOperator extends Operator { + call(observer: Observer): Observer { + return new CountSubscriber(observer); + } +} + +export class CountSubscriber extends Subscriber { + + count: number = 0; + + constructor(destination: Observer) { + super(destination); + } + + _next(x) { + this.count += 1; + } + + _complete() { + this.destination.next(this.count); + this.destination.complete(); + } +}