diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 11bf8a6625..d74dc0f49d 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -646,7 +646,14 @@ export declare function zip, O2 extends Observab export declare function zip, O2 extends ObservableInput, O3 extends ObservableInput>(v1: O1, v2: O2, v3: O3): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf]>; export declare function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; export declare function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; -export declare function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; +export declare function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<[ + ObservedValueOf, + ObservedValueOf, + ObservedValueOf, + ObservedValueOf, + ObservedValueOf, + ObservedValueOf +]>; export declare function zip>(array: O[]): Observable[]>; export declare function zip(array: ObservableInput[]): Observable; export declare function zip, R>(array: O[], resultSelector: (...values: ObservedValueOf[]) => R): Observable; diff --git a/spec/observables/zip-spec.ts b/spec/observables/zip-spec.ts index 53f7eb180a..602fa1fb05 100644 --- a/spec/observables/zip-spec.ts +++ b/spec/observables/zip-spec.ts @@ -2,8 +2,6 @@ import { expect } from 'chai'; import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; import { queueScheduler as rxQueueScheduler, zip, from, of } from 'rxjs'; -declare const Symbol: any; - const queueScheduler = rxQueueScheduler; /** @test {zip} */ @@ -76,18 +74,15 @@ describe('static zip', () => { describe('with iterables', () => { it('should zip them with values', () => { - const myIterator = { - count: 0, - next: function () { - return { value: this.count++, done: false }; + const myIterator = (function *() { + for (let i = 0; i < 4; i++) { + yield i; } - }; - - myIterator[Symbol.iterator] = function () { return this; }; + })(); const e1 = hot('---a---b---c---d---|'); - const e1subs = '^ !'; - const expected = '---w---x---y---z---|'; + const e1subs = '^ !'; + const expected = '---w---x---y---(z|)'; const values = { w: ['a', 0], @@ -100,37 +95,6 @@ describe('static zip', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); - it('should only call `next` as needed', () => { - let nextCalled = 0; - const myIterator = { - count: 0, - next() { - nextCalled++; - return { value: this.count++, done: false }; - } - }; - myIterator[Symbol.iterator] = function() { - return this; - }; - - zip(of(1, 2, 3), myIterator) - .subscribe(); - - // since zip will call `next()` in advance, total calls when - // zipped with 3 other values should be 4. - expect(nextCalled).to.equal(4); - }); - - it('should work with never observable and empty iterable', () => { - const a = cold( '-'); - const asubs = '^'; - const b: number[] = []; - const expected = '-'; - - expectObservable(zip(a, b)).toBe(expected); - expectSubscriptions(a.subscriptions).toBe(asubs); - }); - it('should work with empty observable and empty iterable', () => { const a = cold('|'); const asubs = '(^!)'; @@ -151,11 +115,11 @@ describe('static zip', () => { expectSubscriptions(a.subscriptions).toBe(asubs); }); - it('should work with non-empty observable and empty iterable', () => { + it('should complete instantly if given an empty iterable', () => { const a = hot('---^----a--|'); - const asubs = '^ !'; + const asubs = '(^!)'; const b: number[] = []; - const expected = '--------|'; + const expected = '|'; expectObservable(zip(a, b)).toBe(expected); expectSubscriptions(a.subscriptions).toBe(asubs); @@ -181,16 +145,6 @@ describe('static zip', () => { expectSubscriptions(a.subscriptions).toBe(asubs); }); - it('should work with non-empty observable and empty iterable', () => { - const a = hot('---^----#'); - const asubs = '^ !'; - const b: number[] = []; - const expected = '-----#'; - - expectObservable(zip(a, b)).toBe(expected); - expectSubscriptions(a.subscriptions).toBe(asubs); - }); - it('should work with observable which raises error and non-empty iterable', () => { const a = hot('---^----#'); const asubs = '^ !'; @@ -574,4 +528,18 @@ describe('static zip', () => { expect(vals).to.deep.equal(r[i++]); }, null, done); }); + + it('should be able to zip all iterables', () => { + const results: any[] = []; + zip('abc', '123', 'xyz').subscribe({ + next: value => results.push(value), + complete: () => results.push('complete') + }); + expect(results).to.deep.equal([ + ['a','1','x'], + ['b','2','y'], + ['c','3','z'], + 'complete' + ]); + }) }); diff --git a/spec/operators/zipAll-spec.ts b/spec/operators/zipAll-spec.ts index c8631e3571..b4d5bc8ced 100644 --- a/spec/operators/zipAll-spec.ts +++ b/spec/operators/zipAll-spec.ts @@ -101,19 +101,15 @@ describe('zipAll operator', () => { describe('with iterables', () => { it('should zip them with values', () => { rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const myIterator = { - count: 0, - next() { - return { value: this.count++, done: false }; - }, - [Symbol.iterator]() { - return this; - }, - }; + const myIterator = (function* () { + for (let i = 0; i < 4; i++) { + yield i; + } + })(); const e1 = hot(' ---a---b---c---d---|'); - const e1subs = ' ^------------------!'; - const expected = '---w---x---y---z---|'; + const e1subs = ' ^--------------!'; + const expected = '---w---x---y---(z|)'; const values = { w: ['a', 0], @@ -127,32 +123,12 @@ describe('zipAll operator', () => { }); }); - it('should only call `next` as needed', () => { - let nextCalled = 0; - const myIterator = { - count: 0, - next() { - nextCalled++; - return { value: this.count++, done: false }; - }, - [Symbol.iterator]() { - return this; - }, - }; - - of(of(1, 2, 3), myIterator).pipe(zipAll()).subscribe(); - - // since zip will call `next()` in advance, total calls when - // zipped with 3 other values should be 4. - expect(nextCalled).to.equal(4); - }); - - it('should work with never observable and empty iterable', () => { + it('should complete instantly with never observable and empty iterable', () => { rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const a = cold(' -'); - const asubs = ' ^'; + const asubs = ' (^!)'; const b: string[] = []; - const expected = '-'; + const expected = '|'; expectObservable(of(a, b).pipe(zipAll())).toBe(expected); expectSubscriptions(a.subscriptions).toBe(asubs); @@ -186,9 +162,9 @@ describe('zipAll operator', () => { it('should work with non-empty observable and empty iterable', () => { rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('---^----a--|'); - const asubs = ' ^-------!'; + const asubs = ' (^!)'; const b: string[] = []; - const expected = '--------|'; + const expected = '|'; expectObservable(of(a, b).pipe(zipAll())).toBe(expected); expectSubscriptions(a.subscriptions).toBe(asubs); @@ -219,18 +195,6 @@ describe('zipAll operator', () => { }); }); - it('should work with non-empty observable and empty iterable', () => { - rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const a = hot('---^----#'); - const asubs = ' ^----!'; - const b: string[] = []; - const expected = '-----#'; - - expectObservable(of(a, b).pipe(zipAll())).toBe(expected); - expectSubscriptions(a.subscriptions).toBe(asubs); - }); - }); - it('should work with observable which raises error and non-empty iterable', () => { rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('---^----#'); diff --git a/spec/operators/zipWith-spec.ts b/spec/operators/zipWith-spec.ts index a9c8344eb5..da18367787 100644 --- a/spec/operators/zipWith-spec.ts +++ b/spec/operators/zipWith-spec.ts @@ -3,7 +3,8 @@ import { zipWith, mergeMap } from 'rxjs/operators'; import { queueScheduler, of } from 'rxjs'; import { TestScheduler } from 'rxjs/testing'; import { observableMatcher } from '../helpers/observableMatcher'; -/** @test {zip} */ + +/** @test {zipWith} */ describe('zipWith', () => { let rxTestScheduler: TestScheduler; @@ -74,19 +75,15 @@ describe('zipWith', () => { describe('with iterables', () => { it('should zip them with values', () => { rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const myIterator = { - count: 0, - next: function() { - return { value: this.count++, done: false }; - }, - }; - myIterator[Symbol.iterator] = function() { - return this; - }; + const myIterator = (function*() { + for (let i = 0; i < 4; i++) { + yield i; + } + })(); const e1 = hot(' ---a---b---c---d---|'); - const e1subs = ' ^------------------!'; - const expected = '---w---x---y---z---|'; + const e1subs = ' ^--------------!'; + const expected = '---w---x---y---(z|)'; const values = { w: ['a', 0], @@ -100,33 +97,11 @@ describe('zipWith', () => { }); }); - it('should only call `next` as needed', () => { - let nextCalled = 0; - const myIterator = { - count: 0, - next: function() { - nextCalled++; - return { value: this.count++, done: false }; - }, - }; - myIterator[Symbol.iterator] = function() { - return this; - }; - - of(1, 2, 3) - .pipe(zipWith(myIterator)) - .subscribe(); - - // since zip will call `next()` in advance, total calls when - // zipped with 3 other values should be 4. - expect(nextCalled).to.equal(4); - }); - - it('should work with never observable and empty iterable', () => { + it('should complete instantly for an empty iterable', () => { rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => { const a = cold(' -'); - const asubs = ' ^'; - const expected = '-'; + const asubs = ' (^!)'; + const expected = '|'; const b: string[] = []; expectObservable(a.pipe(zipWith(b))).toBe(expected); @@ -158,12 +133,12 @@ describe('zipWith', () => { }); }); - it('should work with non-empty observable and empty iterable', () => { + it('should complete instantly with non-empty observable and empty iterable', () => { rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot(' ---^----a--|'); - const asubs = ' ^-------!'; + const asubs = ' (^!)'; const b: string[] = []; - const expected = '--------|'; + const expected = ' |'; expectObservable(a.pipe(zipWith(b))).toBe(expected); expectSubscriptions(a.subscriptions).toBe(asubs); @@ -194,18 +169,6 @@ describe('zipWith', () => { }); }); - it('should work with non-empty observable and empty iterable', () => { - rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { - const a = hot('---^----#'); - const asubs = ' ^----!'; - const expected = '-----#'; - const b: string[] = []; - - expectObservable(a.pipe(zipWith(b))).toBe(expected); - expectSubscriptions(a.subscriptions).toBe(asubs); - }); - }); - it('should work with observable which raises error and non-empty iterable', () => { rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => { const a = hot('---^----#'); diff --git a/src/internal/observable/zip.ts b/src/internal/observable/zip.ts index 41d4fbecd9..e7ec7445ed 100644 --- a/src/internal/observable/zip.ts +++ b/src/internal/observable/zip.ts @@ -1,33 +1,132 @@ +/** @prettier */ import { Observable } from '../Observable'; -import { fromArray } from './fromArray'; -import { isArray } from '../util/isArray'; -import { Operator } from '../Operator'; -import { ObservableInput, PartialObserver, ObservedValueOf } from '../types'; -import { Subscriber } from '../Subscriber'; +import { ObservableInput, ObservedValueOf } from '../types'; import { Subscription } from '../Subscription'; -import { iterator as Symbol_iterator } from '../../internal/symbol/iterator'; -import { lift } from '../util/lift'; -import { SimpleOuterSubscriber, SimpleInnerSubscriber, innerSubscribe } from '../innerSubscribe'; +import { from } from './from'; /* tslint:disable:max-line-length */ /** @deprecated resultSelector is no longer supported, pipe to map instead */ export function zip, R>(v1: O1, resultSelector: (v1: ObservedValueOf) => R): Observable; /** @deprecated resultSelector is no longer supported, pipe to map instead */ -export function zip, O2 extends ObservableInput, R>(v1: O1, v2: O2, resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf) => R): Observable; +export function zip, O2 extends ObservableInput, R>( + v1: O1, + v2: O2, + resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf) => R +): Observable; /** @deprecated resultSelector is no longer supported, pipe to map instead */ -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, R>(v1: O1, v2: O2, v3: O3, resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf, v3: ObservedValueOf) => R): Observable; +export function zip, O2 extends ObservableInput, O3 extends ObservableInput, R>( + v1: O1, + v2: O2, + v3: O3, + resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf, v3: ObservedValueOf) => R +): Observable; /** @deprecated resultSelector is no longer supported, pipe to map instead */ -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, R>(v1: O1, v2: O2, v3: O3, v4: O4, resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf, v3: ObservedValueOf, v4: ObservedValueOf) => R): Observable; +export function zip< + O1 extends ObservableInput, + O2 extends ObservableInput, + O3 extends ObservableInput, + O4 extends ObservableInput, + R +>( + v1: O1, + v2: O2, + v3: O3, + v4: O4, + resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf, v3: ObservedValueOf, v4: ObservedValueOf) => R +): Observable; /** @deprecated resultSelector is no longer supported, pipe to map instead */ -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf, v3: ObservedValueOf, v4: ObservedValueOf, v5: ObservedValueOf) => R): Observable; +export function zip< + O1 extends ObservableInput, + O2 extends ObservableInput, + O3 extends ObservableInput, + O4 extends ObservableInput, + O5 extends ObservableInput, + R +>( + v1: O1, + v2: O2, + v3: O3, + v4: O4, + v5: O5, + resultSelector: ( + v1: ObservedValueOf, + v2: ObservedValueOf, + v3: ObservedValueOf, + v4: ObservedValueOf, + v5: ObservedValueOf + ) => R +): Observable; /** @deprecated resultSelector is no longer supported, pipe to map instead */ -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput, R>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, resultSelector: (v1: ObservedValueOf, v2: ObservedValueOf, v3: ObservedValueOf, v4: ObservedValueOf, v5: ObservedValueOf, v6: ObservedValueOf) => R): Observable; - -export function zip, O2 extends ObservableInput>(v1: O1, v2: O2): Observable<[ObservedValueOf, ObservedValueOf]>; -export function zip, O2 extends ObservableInput, O3 extends ObservableInput>(v1: O1, v2: O2, v3: O3): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf]>; -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; -export function zip, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; +export function zip< + O1 extends ObservableInput, + O2 extends ObservableInput, + O3 extends ObservableInput, + O4 extends ObservableInput, + O5 extends ObservableInput, + O6 extends ObservableInput, + R +>( + v1: O1, + v2: O2, + v3: O3, + v4: O4, + v5: O5, + v6: O6, + resultSelector: ( + v1: ObservedValueOf, + v2: ObservedValueOf, + v3: ObservedValueOf, + v4: ObservedValueOf, + v5: ObservedValueOf, + v6: ObservedValueOf + ) => R +): Observable; + +export function zip, O2 extends ObservableInput>( + v1: O1, + v2: O2 +): Observable<[ObservedValueOf, ObservedValueOf]>; +export function zip, O2 extends ObservableInput, O3 extends ObservableInput>( + v1: O1, + v2: O2, + v3: O3 +): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf]>; +export function zip< + O1 extends ObservableInput, + O2 extends ObservableInput, + O3 extends ObservableInput, + O4 extends ObservableInput +>(v1: O1, v2: O2, v3: O3, v4: O4): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; +export function zip< + O1 extends ObservableInput, + O2 extends ObservableInput, + O3 extends ObservableInput, + O4 extends ObservableInput, + O5 extends ObservableInput +>( + v1: O1, + v2: O2, + v3: O3, + v4: O4, + v5: O5 +): Observable<[ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf]>; +export function zip< + O1 extends ObservableInput, + O2 extends ObservableInput, + O3 extends ObservableInput, + O4 extends ObservableInput, + O5 extends ObservableInput, + O6 extends ObservableInput +>( + v1: O1, + v2: O2, + v3: O3, + v4: O4, + v5: O5, + v6: O6 +): Observable< + [ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf, ObservedValueOf] +>; export function zip>(array: O[]): Observable[]>; export function zip(array: ObservableInput[]): Observable; @@ -70,259 +169,61 @@ export function zip(...observables: Array | ((...values: * // { age: 25, name: 'Bar', isDev: true } * // { age: 29, name: 'Beer', isDev: false } * ``` - * @param observables + * @param sources * @return {Observable} * @static true * @name zip * @owner Observable */ export function zip, R>( - ...observables: Array[]) => R)> -): Observable[]|R> { - const last = observables[observables.length - 1]; + ...sources: Array[]) => R)> +): Observable[] | R> { let resultSelector: ((...ys: Array) => R) | undefined = undefined; - if (typeof last === 'function') { - resultSelector = observables.pop() as typeof resultSelector; - } - return lift(fromArray(observables, undefined), new ZipOperator(resultSelector)); -} - -export class ZipOperator implements Operator { - - resultSelector?: (...values: Array) => R; - - constructor(resultSelector?: (...values: Array) => R) { - this.resultSelector = resultSelector; - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new ZipSubscriber(subscriber, this.resultSelector)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class ZipSubscriber extends Subscriber { - private resultSelector?: (...values: Array) => R; - private iterators: LookAheadIterator[] = []; - private active = 0; - - constructor(destination: Subscriber, - resultSelector?: (...values: Array) => R, - values: any = Object.create(null)) { - super(destination); - this.resultSelector = resultSelector; - } - - protected _next(value: any) { - const iterators = this.iterators; - if (isArray(value)) { - iterators.push(new StaticArrayIterator(value)); - } else if (typeof value[Symbol_iterator] === 'function') { - iterators.push(new StaticIterator(value[Symbol_iterator]())); - } else { - iterators.push(new ZipBufferIterator(this.destination, this, value)); - } - } - - protected _complete() { - const iterators = this.iterators; - const len = iterators.length; - - this.unsubscribe(); - - if (len === 0) { - this.destination.complete(); - return; - } - - this.active = len; - for (let i = 0; i < len; i++) { - let iterator: ZipBufferIterator = iterators[i]; - if (iterator.stillUnsubscribed) { - const destination = this.destination as Subscription; - destination.add(iterator.subscribe()); - } else { - this.active--; // not an observable + if (typeof sources[sources.length - 1] === 'function') { + resultSelector = sources.pop() as typeof resultSelector; + } + + return new Observable[]>((subscriber) => { + const buffers: ObservedValueOf[][] = sources.map(() => []); + const completed = sources.map(() => false); + const subscription = new Subscription(); + + const tryEmit = () => { + if (buffers.every((buffer) => buffer.length > 0)) { + let result: any = buffers.map((buffer) => buffer.shift()!); + if (resultSelector) { + try { + result = resultSelector(...result); + } catch (err) { + subscriber.error(err); + return; + } + } + subscriber.next(result); + if (buffers.some((buffer, i) => buffer.length === 0 && completed[i])) { + subscriber.complete(); + } } + }; + + for (let i = 0; !subscriber.closed && i < sources.length; i++) { + const source = from(sources[i]); + subscription.add( + source.subscribe({ + next: (value) => { + buffers[i].push(value); + tryEmit(); + }, + error: (err) => subscriber.error(err), + complete: () => { + completed[i] = true; + if (buffers[i].length === 0) { + subscriber.complete(); + } + }, + }) + ); } - } - - notifyInactive() { - this.active--; - if (this.active === 0) { - this.destination.complete(); - } - } - - checkIterators() { - const iterators = this.iterators; - const len = iterators.length; - const destination = this.destination; - - // abort if not all of them have values - for (let i = 0; i < len; i++) { - let iterator = iterators[i]; - if (typeof iterator.hasValue === 'function' && !iterator.hasValue()) { - return; - } - } - - let shouldComplete = false; - const args: any[] = []; - for (let i = 0; i < len; i++) { - let iterator = iterators[i]; - let result = iterator.next(); - - // check to see if it's completed now that you've gotten - // the next value. - if (iterator.hasCompleted()) { - shouldComplete = true; - } - - if (result.done) { - destination.complete(); - return; - } - - args.push(result.value); - } - - if (this.resultSelector) { - this._tryresultSelector(args); - } else { - destination.next(args); - } - - if (shouldComplete) { - destination.complete(); - } - } - - protected _tryresultSelector(args: any[]) { - let result: any; - try { - result = this.resultSelector!.apply(this, args); - } catch (err) { - this.destination.error(err); - return; - } - this.destination.next(result); - } -} - -interface LookAheadIterator extends Iterator { - hasValue(): boolean; - hasCompleted(): boolean; -} - -class StaticIterator implements LookAheadIterator { - private nextResult: IteratorResult; - - constructor(private iterator: Iterator) { - this.nextResult = iterator.next(); - } - - hasValue() { - return true; - } - - next(): IteratorResult { - const result = this.nextResult; - this.nextResult = this.iterator.next(); - return result; - } - - hasCompleted() { - const nextResult = this.nextResult; - return nextResult && !!nextResult.done; - } -} - -class StaticArrayIterator implements LookAheadIterator { - private index = 0; - private length = 0; - - constructor(private array: T[]) { - this.length = array.length; - } - - [Symbol_iterator]() { - return this; - } - - next(): IteratorResult { - const i = this.index++; - const array = this.array; - return i < this.length ? { value: array[i], done: false } : { value: null, done: true }; - } - - hasValue() { - return this.array.length > this.index; - } - - hasCompleted() { - return this.array.length === this.index; - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class ZipBufferIterator extends SimpleOuterSubscriber implements LookAheadIterator { - stillUnsubscribed = true; - buffer: T[] = []; - isComplete = false; - - constructor(destination: PartialObserver, - private parent: ZipSubscriber, - private observable: Observable) { - super(destination); - } - - [Symbol_iterator]() { - return this; - } - - // NOTE: there is actually a name collision here with Subscriber.next and Iterator.next - // this is legit because `next()` will never be called by a subscription in this case. - next(): IteratorResult { - const buffer = this.buffer; - if (buffer.length === 0 && this.isComplete) { - return { value: null, done: true }; - } else { - return { value: buffer.shift()!, done: false }; - } - } - - hasValue() { - return this.buffer.length > 0; - } - - hasCompleted() { - return this.buffer.length === 0 && this.isComplete; - } - - notifyComplete() { - if (this.buffer.length > 0) { - this.isComplete = true; - this.parent.notifyInactive(); - } else { - this.destination.complete(); - } - } - - notifyNext(innerValue: any): void { - this.buffer.push(innerValue); - this.parent.checkIterators(); - } - - subscribe() { - return innerSubscribe(this.observable, new SimpleInnerSubscriber(this)); - } + return subscription; + }); } diff --git a/src/internal/operators/zipAll.ts b/src/internal/operators/zipAll.ts index 3fd9d33e94..34ee58801a 100644 --- a/src/internal/operators/zipAll.ts +++ b/src/internal/operators/zipAll.ts @@ -1,13 +1,33 @@ -import { ZipOperator } from '../observable/zip'; import { Observable } from '../Observable'; import { OperatorFunction, ObservableInput } from '../types'; import { lift } from '../util/lift'; +import { Subscriber } from '../Subscriber'; +import { zip } from '../observable/zip'; export function zipAll(): OperatorFunction, T[]>; export function zipAll(): OperatorFunction; export function zipAll(project: (...values: T[]) => R): OperatorFunction, R>; export function zipAll(project: (...values: Array) => R): OperatorFunction; -export function zipAll(project?: (...values: Array) => R): OperatorFunction { - return (source: Observable) => lift(source, new ZipOperator(project)); +export function zipAll(project?: (...values: T[]) => R): OperatorFunction, R> { + return (source: Observable>) => lift(source, function zipAllOperator(this: Subscriber, source: Observable>) { + const subscriber = this; + const sources: ObservableInput[] = []; + subscriber.add( + source.subscribe({ + next: source => sources.push(source), + error: err => subscriber.error(err), + complete: () => { + if (sources.length > 0) { + const args = project ? [...sources, project] : sources; + subscriber.add( + zip(...args).subscribe(subscriber) + ) + } else { + subscriber.complete(); + } + } + }) + ) + }); } diff --git a/src/internal/operators/zipWith.ts b/src/internal/operators/zipWith.ts index 6db36af106..aea7e6fbd0 100644 --- a/src/internal/operators/zipWith.ts +++ b/src/internal/operators/zipWith.ts @@ -1,7 +1,8 @@ import { zip as zipStatic } from '../observable/zip'; import { Observable } from '../Observable'; import { ObservableInput, OperatorFunction, ObservedValueTupleFromArray, Cons } from '../types'; -import { stankyLift } from '../util/lift'; +import { lift } from '../util/lift'; +import { Subscriber } from '../Subscriber' /* tslint:disable:max-line-length */ /** @deprecated Deprecated use {@link zipWith} */ @@ -37,13 +38,11 @@ export function zip(array: Array>, project /** * @deprecated Deprecated. Use {@link zipWith}. */ -export function zip(...observables: Array | ((...values: Array) => R)>): OperatorFunction { - return function zipOperatorFunction(source: Observable) { - return stankyLift( - source, - zipStatic(source, ...observables) - ); - }; +export function zip(...sources: Array | ((...values: Array) => R)>): OperatorFunction { + return (source: Observable) => lift(source, function (this: Subscriber, source: Observable) { + const args = [source, ...sources]; + return zipStatic(...args).subscribe(this); + }) } /**