Skip to content

Commit

Permalink
feat(operator): add find, findIndex operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Sep 29, 2015
1 parent 8318089 commit 7c6cc9d
Show file tree
Hide file tree
Showing 10 changed files with 315 additions and 0 deletions.
26 changes: 26 additions & 0 deletions perf/micro/immediate-scheduler/operators/find-predicate-this.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(value, i) {
return value === 20;
};

var testThis = {};

var oldFindPredicateThisArg = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).find(predicate, testThis);
var newFindPredicateThisArg = RxNew.Observable.range(0, 50).find(predicate, testThis);

return suite
.add('old find(predicate, thisArg) with immediate scheduler', function () {
oldFindPredicateThisArg.subscribe(_next, _error, _complete);
})
.add('new find(predicate, thisArg) with immediate scheduler', function () {
newFindPredicateThisArg.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/find-predicate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(value, i) {
return value === 20;
};

var oldFindPredicate = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).find(predicate);
var newFindPredicate = RxNew.Observable.range(0, 50).find(predicate);

return suite
.add('old find(predicate) with immediate scheduler', function () {
oldFindPredicate.subscribe(_next, _error, _complete);
})
.add('new find(predicate) with immediate scheduler', function () {
newFindPredicate.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(value, i) {
return value === 20;
};

var testThis = {};

var oldFindIndexPredicateThisArg = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).findIndex(predicate, testThis);
var newFindIndexPredicateThisArg = RxNew.Observable.range(0, 50).findIndex(predicate, testThis);

return suite
.add('old findIndex(predicate, thisArg) with immediate scheduler', function () {
oldFindIndexPredicateThisArg.subscribe(_next, _error, _complete);
})
.add('new findIndex(predicate, thisArg) with immediate scheduler', function () {
newFindIndexPredicateThisArg.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/findindex-predicate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(value, i) {
return value === 20;
};

var oldFindIndexPredicate = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).findIndex(predicate);
var newFindIndexPredicate = RxNew.Observable.range(0, 50).findIndex(predicate);

return suite
.add('old findIndex(predicate) with immediate scheduler', function () {
oldFindIndexPredicate.subscribe(_next, _error, _complete);
})
.add('new findIndex(predicate) with immediate scheduler', function () {
newFindIndexPredicate.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
67 changes: 67 additions & 0 deletions spec/operators/find-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* globals describe, it, expect, hot, expectObservable */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;

describe('Observable.prototype.find()', function() {
function truePredicate(x) {
return true;
}

it("should not emit if source does not emit", function() {
var source = hot('-');
var expected = '-';

expectObservable(source.find(truePredicate)).toBe(expected);
});

it('should return undefined if source is empty to match predicate', function() {
var expected = '(x|)';

expectObservable(Observable.empty().find(truePredicate)).toBe(expected, {x: undefined});
});

it('should return matching element from source emits single element', function() {
var source = hot('--a--|');
var expected = '--(a|)';

var predicate = function(value) {
return value === 'a';
}

expectObservable(source.find(predicate)).toBe(expected);
});

it('should return undefined if element does not match with predicate', function() {
var source = hot('--a--b--c--|');
var expected = '-----------(x|)';

var predicate = function(value) {
return value === 'z';
}

expectObservable(source.find(predicate)).toBe(expected, { x: undefined });
});

it('should raise if source raise error while element does not match with predicate', function() {
var source = hot('--a--b--#');
var expected = '--------#';

var predicate = function(value) {
return value === 'z';
}

expectObservable(source.find(predicate)).toBe(expected);
});

it('should raise error if predicate throws error', function() {

var source = hot('--a--b--c--|');
var expected = '--#';

var predicate = function(value) {
throw 'error';
}

expectObservable(source.find(predicate)).toBe(expected);
});
});
68 changes: 68 additions & 0 deletions spec/operators/findindex-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/* globals describe, it, expect, hot, expectObservable */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;

describe('Observable.prototype.findIndex()', function() {
function truePredicate(x) {
return true;
}

it("should not emit if source does not emit", function() {
var source = hot('-');
var expected = '-';

expectObservable(source.findIndex(truePredicate)).toBe(expected);
});

it('should return negative index if source is empty to match predicate', function() {
var expected = '(x|)';

expectObservable(Observable.empty().findIndex(truePredicate)).toBe(expected, {x: -1});
});

it('should return index of element from source emits single element', function() {
var sourceValue = 1;
var source = hot('--a--|', { a: sourceValue });
var expected = '--(x|)';

var predicate = function(value) {
return value === sourceValue;
}

expectObservable(source.findIndex(predicate)).toBe(expected, { x: 0 });
});

it('should return negative index if element does not match with predicate', function() {
var source = hot('--a--b--c--|');
var expected = '-----------(x|)';

var predicate = function(value) {
return value === 'z';
}

expectObservable(source.findIndex(predicate)).toBe(expected, { x: -1 });
});

it('should raise if source raise error while element does not match with predicate', function() {
var source = hot('--a--b--#');
var expected = '--------#';

var predicate = function(value) {
return value === 'z';
}

expectObservable(source.findIndex(predicate)).toBe(expected);
});

it('should raise error if predicate throws error', function() {

var source = hot('--a--b--c--|');
var expected = '--#';

var predicate = function(value) {
throw 'error';
}

expectObservable(source.findIndex(predicate)).toBe(expected);
});
});
8 changes: 8 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { CoreOperators } from './CoreOperators';
interface KitchenSinkOperators<T> extends CoreOperators<T> {
elementAt?: (index: number, defaultValue?: any) => Observable<T>;
distinctUntilKeyChanged?: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable<T>;
find?: (predicate: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any) => Observable<T>;
findIndex?: (predicate: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any) => Observable<number>;
}

// operators
Expand Down Expand Up @@ -132,6 +134,12 @@ observableProto.expand = expand;
import filter from './operators/filter';
observableProto.filter = filter;

import find from './operators/extended/find';
observableProto.find = find;

import findIndex from './operators/extended/findIndex';
observableProto.findIndex = findIndex;

import _finally from './operators/finally';
observableProto.finally = _finally;

Expand Down
60 changes: 60 additions & 0 deletions src/operators/extended/find-support.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import Operator from '../../Operator';
import Observer from '../../Observer';
import Observable from '../../Observable';
import Subscriber from '../../Subscriber';

import tryCatch from '../../util/tryCatch';
import {errorObject} from '../../util/errorObject';
import bindCallback from '../../util/bindCallback';

export class FindValueOperator<T, R> implements Operator<T, R> {
constructor(private predicate: (value: T, index: number, source:Observable<T>) => boolean, private source:Observable<T>,
private yieldIndex: boolean, private thisArg?: any) {

}

call(observer: Subscriber<T>): Subscriber<T> {
return new FindValueSubscriber(observer, this.predicate, this.source, this.yieldIndex, this.thisArg);
}
}

export class FindValueSubscriber<T> extends Subscriber<T> {
private predicate: Function;
private index: number = 0;

constructor(destination: Subscriber<T>, predicate: (value: T, index: number, source: Observable<T>) => boolean,
private source: Observable<T>, private yieldIndex: boolean, private thisArg?: any) {
super(destination);

if(typeof predicate === 'function') {
this.predicate = bindCallback(predicate, thisArg, 3);
}
}

private notifyComplete(value: any): void {
const destination = this.destination;

destination.next(value);
destination.complete();
}

_next(value: T) {
const predicate = this.predicate;

if (predicate === undefined) {
this.destination.error(new TypeError('predicate must be a function'));
}

let index = this.index++;
let result = tryCatch(predicate)(value, index, this.source);
if(result === errorObject) {
this.destination.error(result.e);
} else if (result) {
this.notifyComplete(this.yieldIndex ? index : value);
}
}

_complete() {
this.notifyComplete(this.yieldIndex ? -1 : undefined);
}
}
6 changes: 6 additions & 0 deletions src/operators/extended/find.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Observable from '../../Observable';
import { FindValueOperator } from './find-support';

export default function find<T>(predicate: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any): Observable<T> {
return this.lift(new FindValueOperator(predicate, this, false, thisArg));
}
6 changes: 6 additions & 0 deletions src/operators/extended/findIndex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import Observable from '../../Observable';
import { FindValueOperator } from './find-support';

export default function findIndex<T>(predicate: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any): Observable<number> {
return this.lift(new FindValueOperator(predicate, this, true, thisArg));
}

0 comments on commit 7c6cc9d

Please sign in to comment.