diff --git a/spec/operators/endWith-spec.ts b/spec/operators/endWith-spec.ts new file mode 100644 index 0000000000..0da37e53a5 --- /dev/null +++ b/spec/operators/endWith-spec.ts @@ -0,0 +1,158 @@ +import { of } from 'rxjs'; +import { endWith, mergeMap } from 'rxjs/operators'; +import { TestScheduler } from 'rxjs/testing'; +import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; + +declare function asDiagram(arg: string): Function; + +declare const rxTestScheduler: TestScheduler; + +/** @test {endWith} */ +describe('endWith operator', () => { + const defaultStartValue = 'x'; + + asDiagram('endWith(s)')('should append to a cold Observable', () => { + const e1 = cold('---a--b--c--|'); + const e1subs = '^ !'; + const expected = '---a--b--c--(s|)'; + + expectObservable(e1.pipe(endWith('s'))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should end an observable with given value', () => { + const e1 = hot('--a--|'); + const e1subs = '^ !'; + const expected = '--a--(x|)'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not end with given value if source does not complete', () => { + const e1 = hot('----a-'); + const e1subs = '^ '; + const expected = '----a-'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not end with given value if source never emits and does not completes', () => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should end with given value if source does not emit but does complete', () => { + const e1 = hot('---|'); + const e1subs = '^ !'; + const expected = '---(x|)'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should emit given value and complete immediately if source is empty', () => { + const e1 = cold('|'); + const e1subs = '(^!)'; + const expected = '(x|)'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should end with given value and source both if source emits single value', () => { + const e1 = cold('(a|)'); + const e1subs = '(^!)'; + const expected = '(ax|)'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should end with given values when given more than one value', () => { + const e1 = hot('-----a--|'); + const e1subs = '^ !'; + const expected = '-----a--(yz|)'; + + expectObservable(e1.pipe(endWith('y', 'z'))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error and not end with given value if source raises error', () => { + const e1 = hot('--#'); + const e1subs = '^ !'; + const expected = '--#'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected, defaultStartValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error immediately and not end with given value if source throws error immediately', () => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; + + expectObservable(e1.pipe(endWith(defaultStartValue))).toBe(expected, defaultStartValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing explicitly and early', () => { + const e1 = hot('---a--b----c--d--|'); + const unsub = ' ! '; + const e1subs = '^ ! '; + const expected = '---a--b---'; + + const result = e1.pipe(endWith('s', rxTestScheduler)); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', () => { + const e1 = hot('---a--b----c--d--|'); + const e1subs = '^ ! '; + const expected = '---a--b--- '; + const unsub = ' ! '; + + const result = e1.pipe( + mergeMap((x: string) => of(x)), + endWith('s', rxTestScheduler), + mergeMap((x: string) => of(x)) + ); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should end with empty if given value is not specified', () => { + const e1 = hot('-a-|'); + const e1subs = '^ !'; + const expected = '-a-|'; + + expectObservable(e1.pipe(endWith(rxTestScheduler))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should accept scheduler as last argument with single value', () => { + const e1 = hot('--a--|'); + const e1subs = '^ !'; + const expected = '--a--(x|)'; + + expectObservable(e1.pipe(endWith(defaultStartValue, rxTestScheduler))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should accept scheduler as last argument with multiple value', () => { + const e1 = hot('-----a--|'); + const e1subs = '^ !'; + const expected = '-----a--(yz|)'; + + expectObservable(e1.pipe(endWith('y', 'z', rxTestScheduler))).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); diff --git a/src/internal/operators/endWith.ts b/src/internal/operators/endWith.ts new file mode 100644 index 0000000000..7bc6569d26 --- /dev/null +++ b/src/internal/operators/endWith.ts @@ -0,0 +1,50 @@ +import { Observable } from '../Observable'; +import { fromArray } from '../observable/fromArray'; +import { scalar } from '../observable/scalar'; +import { empty } from '../observable/empty'; +import { concat as concatStatic } from '../observable/concat'; +import { isScheduler } from '../util/isScheduler'; +import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; + +/* tslint:disable:max-line-length */ +export function endWith(scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(v1: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(v1: T, v2: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(v1: T, v2: T, v3: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(v1: T, v2: T, v3: T, v4: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(v1: T, v2: T, v3: T, v4: T, v5: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(v1: T, v2: T, v3: T, v4: T, v5: T, v6: T, scheduler?: SchedulerLike): MonoTypeOperatorFunction; +export function endWith(...array: Array): MonoTypeOperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Returns an Observable that emits the items you specify as arguments after it finishes emitting + * items emitted by the source Observable. + * + * @param {...T} values - Items you want the modified Observable to emit last. + * @param {Scheduler} [scheduler] - A {@link IScheduler} to use for scheduling + * the emissions of the `next` notifications. + * @return {Observable} An Observable that emits the items emitted by the source Observable + * and then emits the items in the specified Iterable. + * @method endWith + * @owner Observable + */ +export function endWith(...array: Array): MonoTypeOperatorFunction { + return (source: Observable) => { + let scheduler = array[array.length - 1]; + if (isScheduler(scheduler)) { + array.pop(); + } else { + scheduler = null; + } + + const len = array.length; + if (len === 1 && !scheduler) { + return concatStatic(source, scalar(array[0] as T)); + } else if (len > 0) { + return concatStatic(source, fromArray(array as T[], scheduler)); + } else { + return concatStatic(source, empty(scheduler) as any); + } + }; +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 91e4d4c9d5..53b8e697b4 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -25,6 +25,7 @@ export { distinct } from '../internal/operators/distinct'; export { distinctUntilChanged } from '../internal/operators/distinctUntilChanged'; export { distinctUntilKeyChanged } from '../internal/operators/distinctUntilKeyChanged'; export { elementAt } from '../internal/operators/elementAt'; +export { endWith } from '../internal/operators/endWith'; export { every } from '../internal/operators/every'; export { exhaust } from '../internal/operators/exhaust'; export { exhaustMap } from '../internal/operators/exhaustMap';