Skip to content

Commit

Permalink
fix(find): unsubscribe from source when found (#3968)
Browse files Browse the repository at this point in the history
* test(find): add failing unsubscribe test

* test(findIndex): add failing unsubscribe test

* test(first): add unsubscription test

* fix(find): unsubscribe from source when found

* chore(test): remove any
  • Loading branch information
cartant authored and benlesh committed Aug 1, 2018
1 parent 6d6d08f commit fd01f7b
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 27 deletions.
43 changes: 30 additions & 13 deletions spec/operators/find-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { expect } from 'chai';
import { find, mergeMap } from 'rxjs/operators';
import { find, mergeMap, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of, Observable, from } from 'rxjs';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {find} */
describe('find operator', () => {
function truePredicate(x: any) {
Expand All @@ -19,13 +22,13 @@ describe('find operator', () => {

const predicate = function (x: number) { return x % 5 === 0; };

expectObservable((<any>source).pipe(find(predicate))).toBe(expected, values);
expectObservable(source.pipe(find(predicate))).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should throw if not provided a function', () => {
expect(() => {
(<any>of('yut', 'yee', 'sam')).pipe(find('yee' as any));
of('yut', 'yee', 'sam').pipe(find('yee' as any));
}).to.throw(TypeError, 'predicate is not a function');
});

Expand All @@ -34,7 +37,7 @@ describe('find operator', () => {
const subs = '^';
const expected = '-';

expectObservable((<any>source).pipe(find(truePredicate))).toBe(expected);
expectObservable(source.pipe(find(truePredicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -43,7 +46,7 @@ describe('find operator', () => {
const subs = '(^!)';
const expected = '(x|)';

const result = (<any>source).pipe(find(truePredicate));
const result = source.pipe(find(truePredicate));

expectObservable(result).toBe(expected, {x: undefined});
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -58,7 +61,7 @@ describe('find operator', () => {
return value === 'a';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -71,7 +74,7 @@ describe('find operator', () => {
return value === 'b';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -87,7 +90,7 @@ describe('find operator', () => {
return value === this.target;
};

expectObservable((<any>source).pipe(find(predicate, finder))).toBe(expected);
expectObservable(source.pipe(find(predicate, finder))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -100,7 +103,7 @@ describe('find operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected, { x: undefined });
expectObservable(source.pipe(find(predicate))).toBe(expected, { x: undefined });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -110,7 +113,7 @@ describe('find operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(find((value: string) => value === 'z'));
const result = source.pipe(find((value: string) => value === 'z'));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -122,7 +125,7 @@ describe('find operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(
const result = source.pipe(
mergeMap((x: string) => of(x)),
find((value: string) => value === 'z'),
mergeMap((x: string) => of(x))
Expand All @@ -132,6 +135,20 @@ describe('find operator', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe when the predicate is matched', () => {
const source = hot('--a--b---c-|');
const subs = '^ !';
const expected = '-------(b|)';

const duration = rxTestScheduler.createTime('--|');

expectObservable(source.pipe(
find((value: string) => value === 'b'),
delay(duration, rxTestScheduler)
)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should raise if source raise error while element does not match with predicate', () => {
const source = hot('--a--b--#');
const subs = '^ !';
Expand All @@ -141,7 +158,7 @@ describe('find operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -154,7 +171,7 @@ describe('find operator', () => {
throw 'error';
};

expectObservable((<any>source).pipe(find(predicate))).toBe(expected);
expectObservable(source.pipe(find(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand Down
43 changes: 30 additions & 13 deletions spec/operators/findIndex-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { findIndex, mergeMap } from 'rxjs/operators';
import { findIndex, mergeMap, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { of } from 'rxjs';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {findIndex} */
describe('findIndex operator', () => {
function truePredicate(x: any) {
Expand All @@ -18,7 +21,7 @@ describe('findIndex operator', () => {

const predicate = function (x: number) { return x % 5 === 0; };

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: 2 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: 2 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -27,7 +30,7 @@ describe('findIndex operator', () => {
const subs = '^';
const expected = '-';

expectObservable((<any>source).pipe(findIndex(truePredicate))).toBe(expected);
expectObservable(source.pipe(findIndex(truePredicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -36,7 +39,7 @@ describe('findIndex operator', () => {
const subs = '(^!)';
const expected = '(x|)';

const result = (<any>source).pipe(findIndex(truePredicate));
const result = source.pipe(findIndex(truePredicate));

expectObservable(result).toBe(expected, {x: -1});
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -52,7 +55,7 @@ describe('findIndex operator', () => {
return value === sourceValue;
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: 0 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: 0 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -65,7 +68,7 @@ describe('findIndex operator', () => {
return value === 7;
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: 1 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: 1 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -78,7 +81,7 @@ describe('findIndex operator', () => {
const predicate = function (this: typeof sourceValues, value: number) {
return value === this.b;
};
const result = (<any>source).pipe(findIndex(predicate, sourceValues));
const result = source.pipe(findIndex(predicate, sourceValues));

expectObservable(result).toBe(expected, { x: 1 });
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -93,7 +96,7 @@ describe('findIndex operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected, { x: -1 });
expectObservable(source.pipe(findIndex(predicate))).toBe(expected, { x: -1 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -103,7 +106,7 @@ describe('findIndex operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(findIndex((value: string) => value === 'z'));
const result = source.pipe(findIndex((value: string) => value === 'z'));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
Expand All @@ -115,16 +118,30 @@ describe('findIndex operator', () => {
const expected = '------- ';
const unsub = ' ! ';

const result = (<any>source).pipe(
const result = source.pipe(
mergeMap((x: string) => of(x)),
findIndex((value: string) => value === 'z'),
mergeMap((x: string) => of(x))
mergeMap((x: number) => of(x))
);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should unsubscribe when the predicate is matched', () => {
const source = hot('--a--b---c-|');
const subs = '^ !';
const expected = '-------(x|)';

const duration = rxTestScheduler.createTime('--|');

expectObservable(source.pipe(
findIndex((value: string) => value === 'b'),
delay(duration, rxTestScheduler)
)).toBe(expected, { x: 1 });
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should raise if source raise error while element does not match with predicate', () => {
const source = hot('--a--b--#');
const subs = '^ !';
Expand All @@ -134,7 +151,7 @@ describe('findIndex operator', () => {
return value === 'z';
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected);
expectObservable(source.pipe(findIndex(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

Expand All @@ -147,7 +164,7 @@ describe('findIndex operator', () => {
throw 'error';
};

expectObservable((<any>source).pipe(findIndex(predicate))).toBe(expected);
expectObservable(source.pipe(findIndex(predicate))).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
19 changes: 18 additions & 1 deletion spec/operators/first-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { expect } from 'chai';
import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { first, mergeMap } from 'rxjs/operators';
import { first, mergeMap, delay } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { of, from, Observable, Subject, EmptyError } from 'rxjs';

declare function asDiagram(arg: string): Function;

declare const rxTestScheduler: TestScheduler;

/** @test {first} */
describe('Observable.prototype.first', () => {
asDiagram('first')('should take the first value of an observable with many values', () => {
Expand Down Expand Up @@ -101,6 +104,20 @@ describe('Observable.prototype.first', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe when the first value is receiv', () => {
const source = hot('--a--b---c-|');
const subs = '^ !';
const expected = '----(a|)';

const duration = rxTestScheduler.createTime('--|');

expectObservable(source.pipe(
first(),
delay(duration, rxTestScheduler)
)).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should return first value that matches a predicate', () => {
const e1 = hot('--a-^--b--c--a--c--|');
const expected = '------(c|)';
Expand Down
1 change: 1 addition & 0 deletions src/internal/operators/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export class FindValueSubscriber<T> extends Subscriber<T> {

destination.next(value);
destination.complete();
this.unsubscribe();
}

protected _next(value: T): void {
Expand Down

0 comments on commit fd01f7b

Please sign in to comment.