Skip to content

Commit

Permalink
Merge pull request #2667 from benlesh/lettable-start
Browse files Browse the repository at this point in the history
WIP: Adding lettable operators
  • Loading branch information
benlesh authored Aug 17, 2017
2 parents 4f56a61 + 42f9daf commit de9f2c8
Show file tree
Hide file tree
Showing 133 changed files with 7,029 additions and 4,070 deletions.
29 changes: 29 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as sinon from 'sinon';
import * as Rx from '../dist/cjs/Rx';
import {TeardownLogic} from '../dist/cjs/Subscription';
import marbleTestingSignature = require('./helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { map } from '../dist/cjs/operators';

declare const { asDiagram, rxTestScheduler };
declare const cold: typeof marbleTestingSignature.cold;
Expand Down Expand Up @@ -621,6 +622,34 @@ describe('Observable', () => {
});
});
});

describe('pipe', () => {
it('should exist', () => {
const source = Observable.of('test');
expect(source.pipe).to.be.a('function');
});

it('should pipe multiple operations', (done) => {
Observable.of('test')
.pipe(
map((x: string) => x + x),
map((x: string) => x + '!!!')
)
.subscribe(
x => {
expect(x).to.equal('testtest!!!');
},
null,
done
);
});

it('should return the same observable if there are no arguments', () => {
const source = Observable.of('test');
const result = source.pipe();
expect(result).to.equal(source);
});
});
});

/** @test {Observable} */
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/groupBy-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
import {GroupedObservable} from '../../dist/cjs/operator/groupBy';
import {GroupedObservable} from '../../dist/cjs/operators/groupBy';
import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports

declare const { asDiagram };
Expand Down
33 changes: 33 additions & 0 deletions spec/util/pipe-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { expect } from 'chai';
import { pipe } from '../../dist/cjs/util/pipe';

describe('pipe', () => {
it('should exist', () => {
expect(pipe).to.be.a('function');
});

it('should pipe two functions together', () => {
const a = x => x + x;
const b = x => x - 1;

const c = pipe(a, b);
expect(c).to.be.a('function');
expect(c(1)).to.equal(1);
expect(c(10)).to.equal(19);
});

it('should return the same function if only one is passed', () => {
const a = x => x;
const c = pipe(a);

expect(c).to.equal(a);
});

it('should return a noop if not passed a function', () => {
const c = pipe();

expect(c('whatever')).to.equal('whatever');
const someObj = {};
expect(c(someObj)).to.equal(someObj);
});
});
41 changes: 41 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { toSubscriber } from './util/toSubscriber';
import { IfObservable } from './observable/IfObservable';
import { ErrorObservable } from './observable/ErrorObservable';
import { observable as Symbol_observable } from './symbol/observable';
import { OperatorFunction } from './interfaces';
import { pipeFromArray } from './util/pipe';

export interface Subscribable<T> {
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
Expand Down Expand Up @@ -286,4 +288,43 @@ export class Observable<T> implements Subscribable<T> {
[Symbol_observable]() {
return this;
}

/* tslint:disable:max-line-length */
pipe(): Observable<T>
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>
pipe<A, B, C, D>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>): Observable<D>
pipe<A, B, C, D, E>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>): Observable<E>
pipe<A, B, C, D, E, F>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>): Observable<F>
pipe<A, B, C, D, E, F, G>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>): Observable<G>
pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>
/* tslint:enable:max-line-length */

/**
* Used to stitch together functional operators into a chain.
* @method pipe
* @return {Observable} the Observable result of all of the operators having
* been called in the order they were passed in.
*
* @example
*
* import { map, filter, scan } from 'rxjs/operators';
*
* Rx.Observable.interval(1000)
* .pipe(
* filter(x => x % 2 === 0),
* map(x => x + x),
* scan((acc, x) => acc + x)
* )
* .subscribe(x => console.log(x))
*/
pipe<R>(...operations: OperatorFunction<T, R>[]): Observable<R> {
if (operations.length === 0) {
return this as any;
}

return pipeFromArray(operations)(this);
}
}
2 changes: 1 addition & 1 deletion src/ReplaySubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { IScheduler } from './Scheduler';
import { queue } from './scheduler/queue';
import { Subscriber } from './Subscriber';
import { Subscription } from './Subscription';
import { ObserveOnSubscriber } from './operator/observeOn';
import { ObserveOnSubscriber } from './operators/observeOn';
import { ObjectUnsubscribedError } from './util/ObjectUnsubscribedError';
import { SubjectSubscription } from './SubjectSubscription';
/**
Expand Down
3 changes: 2 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,11 @@ export {ObjectUnsubscribedError} from './util/ObjectUnsubscribedError';
export {TimeoutError} from './util/TimeoutError';
export {UnsubscriptionError} from './util/UnsubscriptionError';
export {TimeInterval} from './operator/timeInterval';
export {Timestamp} from './operator/timestamp';
export {Timestamp} from './operators/timestamp';
export {TestScheduler} from './testing/TestScheduler';
export {VirtualTimeScheduler} from './scheduler/VirtualTimeScheduler';
export {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observable/dom/AjaxObservable';
export { pipe } from './util/pipe';

import { asap } from './scheduler/asap';
import { async } from './scheduler/async';
Expand Down
9 changes: 9 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Observable } from './Observable';

export type UnaryFunction<T, R> = (source: T) => R;

export type OperatorFunction<T, R> = UnaryFunction<Observable<T>, Observable<R>>;

export type FactoryOrValue<T> = T | (() => T);

export type MonoTypeOperatorFunction<T> = OperatorFunction<T, T>;
3 changes: 2 additions & 1 deletion src/observable/ConnectableObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Operator } from '../Operator';
import { Observable } from '../Observable';
import { Subscriber } from '../Subscriber';
import { Subscription, TeardownLogic } from '../Subscription';
import { refCount as higherOrderRefCount } from '../operators/refCount';

/**
* @class ConnectableObservable<T>
Expand Down Expand Up @@ -49,7 +50,7 @@ export class ConnectableObservable<T> extends Observable<T> {
}

refCount(): Observable<T> {
return this.lift(new RefCountOperator<T>(this));
return higherOrderRefCount()(this);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/observable/FromObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { IScheduler } from '../Scheduler';
import { iterator as Symbol_iterator } from '../symbol/iterator';
import { Observable, ObservableInput } from '../Observable';
import { Subscriber } from '../Subscriber';
import { ObserveOnSubscriber } from '../operator/observeOn';
import { ObserveOnSubscriber } from '../operators/observeOn';
import { observable as Symbol_observable } from '../symbol/observable';

/**
Expand Down
117 changes: 115 additions & 2 deletions src/observable/concat.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,116 @@
import { concatStatic } from '../operator/concat';
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { isScheduler } from '../util/isScheduler';
import { of } from './of';
import { from } from './from';
import { concatAll } from '../operators/concatAll';

export const concat = concatStatic;
/* tslint:disable:max-line-length */
export function concat<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function concat<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function concat<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function concat<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function concat<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function concat<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function concat<T>(...observables: (ObservableInput<T> | IScheduler)[]): Observable<T>;
export function concat<T, R>(...observables: (ObservableInput<any> | IScheduler)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which sequentially emits all values from given
* Observable and then moves on to the next.
*
* <span class="informal">Concatenates multiple Observables together by
* sequentially emitting their values, one Observable after the other.</span>
*
* <img src="./img/concat.png" width="100%">
*
* `concat` joins multiple Observables together, by subscribing to them one at a time and
* merging their results into the output Observable. You can pass either an array of
* Observables, or put them directly as arguments. Passing an empty array will result
* in Observable that completes immediately.
*
* `concat` will subscribe to first input Observable and emit all its values, without
* changing or affecting them in any way. When that Observable completes, it will
* subscribe to then next Observable passed and, again, emit its values. This will be
* repeated, until the operator runs out of Observables. When last input Observable completes,
* `concat` will complete as well. At any given moment only one Observable passed to operator
* emits values. If you would like to emit values from passed Observables concurrently, check out
* {@link merge} instead, especially with optional `concurrent` parameter. As a matter of fact,
* `concat` is an equivalent of `merge` operator with `concurrent` parameter set to `1`.
*
* Note that if some input Observable never completes, `concat` will also never complete
* and Observables following the one that did not complete will never be subscribed. On the other
* hand, if some Observable simply completes immediately after it is subscribed, it will be
* invisible for `concat`, which will just move on to the next Observable.
*
* If any Observable in chain errors, instead of passing control to the next Observable,
* `concat` will error immediately as well. Observables that would be subscribed after
* the one that emitted error, never will.
*
* If you pass to `concat` the same Observable many times, its stream of values
* will be "replayed" on every subscription, which means you can repeat given Observable
* as many times as you like. If passing the same Observable to `concat` 1000 times becomes tedious,
* you can always use {@link repeat}.
*
* @example <caption>Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10</caption>
* var timer = Rx.Observable.interval(1000).take(4);
* var sequence = Rx.Observable.range(1, 10);
* var result = Rx.Observable.concat(timer, sequence);
* result.subscribe(x => console.log(x));
*
* // results in:
* // 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
*
*
* @example <caption>Concatenate an array of 3 Observables</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
* result.subscribe(x => console.log(x));
*
* // results in the following:
* // (Prints to console sequentially)
* // -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
* // -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
* // -500ms-> 0 -500ms-> 1 -500ms-> ... 9
*
*
* @example <caption>Concatenate the same Observable to repeat it</caption>
* const timer = Rx.Observable.interval(1000).take(2);
*
* Rx.Observable.concat(timer, timer) // concating the same Observable!
* .subscribe(
* value => console.log(value),
* err => {},
* () => console.log('...and it is done!')
* );
*
* // Logs:
* // 0 after 1s
* // 1 after 2s
* // 0 after 3s
* // 1 after 4s
* // "...and it is done!" also after 4s
*
* @see {@link concatAll}
* @see {@link concatMap}
* @see {@link concatMapTo}
*
* @param {ObservableInput} input1 An input Observable to concatenate with others.
* @param {ObservableInput} input2 An input Observable to concatenate with others.
* More than one input Observables may be given as argument.
* @param {Scheduler} [scheduler=null] An optional IScheduler to schedule each
* Observable subscription on.
* @return {Observable} All values of each passed Observable merged into a
* single Observable, in order, in serial fashion.
* @static true
* @name concat
* @owner Observable
*/
export function concat<T, R>(...observables: Array<ObservableInput<any> | IScheduler>): Observable<R> {
if (observables.length === 1 || (observables.length === 2 && isScheduler(observables[1]))) {
return from(<any>observables[0]);
}
return concatAll()(of(...observables));
}
14 changes: 11 additions & 3 deletions src/observable/dom/AjaxObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { errorObject } from '../../util/errorObject';
import { Observable } from '../../Observable';
import { Subscriber } from '../../Subscriber';
import { TeardownLogic } from '../../Subscription';
import { MapOperator } from '../../operator/map';
import { map } from '../../operators';

export interface AjaxRequest {
url?: string;
Expand Down Expand Up @@ -87,9 +87,17 @@ export function ajaxPatch(url: string, body?: any, headers?: Object): Observable
return new AjaxObservable<AjaxResponse>({ method: 'PATCH', url, body, headers });
};

const mapResponse = map((x: AjaxResponse, index: number) => x.response);

export function ajaxGetJSON<T>(url: string, headers?: Object): Observable<T> {
return new AjaxObservable<AjaxResponse>({ method: 'GET', url, responseType: 'json', headers })
.lift<T>(new MapOperator<AjaxResponse, T>((x: AjaxResponse, index: number): T => x.response, null));
return mapResponse(
new AjaxObservable<AjaxResponse>({
method: 'GET',
url,
responseType: 'json',
headers
})
);
};

/**
Expand Down
Loading

0 comments on commit de9f2c8

Please sign in to comment.