Skip to content

Commit

Permalink
fix(throwIfEmpty): ensure result is retry-able
Browse files Browse the repository at this point in the history
* feat(tap): reset throwIfEmpty hasValue on retry

* refactor(throwIfEmpty): re-implement throwIfEmpty

* refactor(throwIfEmpty): fix typos

* refactor(throwIfEmpty): remove tap changes

* refactor(throwIfEmpty): fix type errors

* refactor(throwIfEmpty): update typing info and complete method
  • Loading branch information
arpadvas authored and benlesh committed Mar 15, 2019
1 parent f6acdc0 commit c4f44b9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 14 deletions.
69 changes: 67 additions & 2 deletions spec/operators/throwIfEmpty-spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { EMPTY, of, EmptyError } from 'rxjs';
import { throwIfEmpty } from 'rxjs/operators';
import { EMPTY, of, EmptyError, defer, throwError } from 'rxjs';
import { throwIfEmpty, mergeMap, retry } from 'rxjs/operators';

declare function asDiagram(arg: string): Function;

Expand Down Expand Up @@ -77,6 +77,39 @@ describe('throwIfEmpty', () => {
).toBe(expected, undefined, new Error('test'));
expectSubscriptions(source.subscriptions).toBe([sub1]);
});

it('should throw if empty after retry', () => {
const error = new Error('So empty inside');
let thrown: any;
let sourceIsEmpty = false;

const source = defer(() => {
if (sourceIsEmpty) {
return EMPTY;
}
sourceIsEmpty = true;
return of(1, 2);
});

source.pipe(
throwIfEmpty(() => error),
mergeMap(value => {
if (value > 1) {
return throwError(new Error());
}

return of(value);
}),
retry(1)
).subscribe({
error(err) {
thrown = err;
}
});

expect(thrown).to.equal(error);

});
});

describe('without errorFactory', () => {
Expand Down Expand Up @@ -139,5 +172,37 @@ describe('throwIfEmpty', () => {
).toBe(expected, undefined, new EmptyError());
expectSubscriptions(source.subscriptions).toBe([sub1]);
});

it('should throw if empty after retry', () => {
let thrown: any;
let sourceIsEmpty = false;

const source = defer(() => {
if (sourceIsEmpty) {
return EMPTY;
}
sourceIsEmpty = true;
return of(1, 2);
});

source.pipe(
throwIfEmpty(),
mergeMap(value => {
if (value > 1) {
return throwError(new Error());
}

return of(value);
}),
retry(1)
).subscribe({
error(err) {
thrown = err;
}
});

expect(thrown).to.be.instanceof(EmptyError);

});
});
});
57 changes: 45 additions & 12 deletions src/internal/operators/throwIfEmpty.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { tap } from './tap';
import { EmptyError } from '../util/EmptyError';
import { MonoTypeOperatorFunction } from '../types';
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { TeardownLogic, MonoTypeOperatorFunction } from '../types';

/**
* If the source observable completes without emitting a value, it will emit
Expand All @@ -24,24 +26,55 @@ import { MonoTypeOperatorFunction } from '../types';
* )
* .subscribe({
* next() { console.log('The button was clicked'); },
* error(err) { console.error(err); },
* error(err) { console.error(err); }
* });
* ```
*
* @param {Function} [errorFactory] A factory function called to produce the
* @param errorFactory A factory function called to produce the
* error to be thrown when the source observable completes without emitting a
* value.
*/
export const throwIfEmpty =
<T>(errorFactory: (() => any) = defaultErrorFactory) => tap<T>({
hasValue: false,
next() { this.hasValue = true; },
complete() {
if (!this.hasValue) {
throw errorFactory();
export function throwIfEmpty <T>(errorFactory: (() => any) = defaultErrorFactory): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => {
return source.lift(new ThrowIfEmptyOperator(errorFactory));
};
}

class ThrowIfEmptyOperator<T> implements Operator<T, T> {
constructor(private errorFactory: () => any) {
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
return source.subscribe(new ThrowIfEmptySubscriber(subscriber, this.errorFactory));
}
}

class ThrowIfEmptySubscriber<T> extends Subscriber<T> {
private hasValue: boolean = false;

constructor(destination: Subscriber<T>, private errorFactory: () => any) {
super(destination);
}

protected _next(value: T): void {
this.hasValue = true;
this.destination.next(value);
}

protected _complete() {
if (!this.hasValue) {
let err: any;
try {
err = this.errorFactory();
} catch (e) {
err = e;
}
this.destination.error(err);
} else {
return this.destination.complete();
}
} as any);
}
}

function defaultErrorFactory() {
return new EmptyError();
Expand Down

0 comments on commit c4f44b9

Please sign in to comment.