Skip to content

Commit

Permalink
fix(zip): zip operators and functions are now able to zip all iterabl…
Browse files Browse the repository at this point in the history
…e sources (#5688)

* fix(zip): zip operators and functions are now able to zip all iterable sources

BREAKING CHANGE: `zip` operators will no longer iterate provided iterables "as needed", instead the iterables will be treated as push-streams just like they would be everywhere else in RxJS. This means that passing an endless iterable will result in the thread locking up, as it will endlessly try to read from that iterable. This puts us in-line with all other Rx implementations. To work around this, it is probably best to use `map` or some combination of `map` and `zip`. For example, `zip(source$, iterator)` could be `source$.pipe(map(value => [value, iterator.next().value]))`.

fixes #4304

* chore: update golden file

* chore: I hate ts-api-guardian sometimes
  • Loading branch information
benlesh authored Sep 3, 2020
1 parent 9bb046c commit 02c3a1b
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 432 deletions.
9 changes: 8 additions & 1 deletion api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,14 @@ export declare function zip<O1 extends ObservableInput<any>, O2 extends Observab
export declare function zip<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>]>;
export declare function zip<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>]>;
export declare function zip<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>]>;
export declare function zip<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<[ObservedValueOf<O1>, ObservedValueOf<O2>, ObservedValueOf<O3>, ObservedValueOf<O4>, ObservedValueOf<O5>, ObservedValueOf<O6>]>;
export declare function zip<O1 extends ObservableInput<any>, O2 extends ObservableInput<any>, O3 extends ObservableInput<any>, O4 extends ObservableInput<any>, O5 extends ObservableInput<any>, O6 extends ObservableInput<any>>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable<[
ObservedValueOf<O1>,
ObservedValueOf<O2>,
ObservedValueOf<O3>,
ObservedValueOf<O4>,
ObservedValueOf<O5>,
ObservedValueOf<O6>
]>;
export declare function zip<O extends ObservableInput<any>>(array: O[]): Observable<ObservedValueOf<O>[]>;
export declare function zip<R>(array: ObservableInput<any>[]): Observable<R>;
export declare function zip<O extends ObservableInput<any>, R>(array: O[], resultSelector: (...values: ObservedValueOf<O>[]) => R): Observable<R>;
Expand Down
78 changes: 23 additions & 55 deletions spec/observables/zip-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ import { expect } from 'chai';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { queueScheduler as rxQueueScheduler, zip, from, of } from 'rxjs';

declare const Symbol: any;

const queueScheduler = rxQueueScheduler;

/** @test {zip} */
Expand Down Expand Up @@ -76,18 +74,15 @@ describe('static zip', () => {

describe('with iterables', () => {
it('should zip them with values', () => {
const myIterator = <any>{
count: 0,
next: function () {
return { value: this.count++, done: false };
const myIterator = (function *() {
for (let i = 0; i < 4; i++) {
yield i;
}
};

myIterator[Symbol.iterator] = function () { return this; };
})();

const e1 = hot('---a---b---c---d---|');
const e1subs = '^ !';
const expected = '---w---x---y---z---|';
const e1subs = '^ !';
const expected = '---w---x---y---(z|)';

const values = {
w: ['a', 0],
Expand All @@ -100,37 +95,6 @@ describe('static zip', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should only call `next` as needed', () => {
let nextCalled = 0;
const myIterator = <any>{
count: 0,
next() {
nextCalled++;
return { value: this.count++, done: false };
}
};
myIterator[Symbol.iterator] = function() {
return this;
};

zip(of(1, 2, 3), myIterator)
.subscribe();

// since zip will call `next()` in advance, total calls when
// zipped with 3 other values should be 4.
expect(nextCalled).to.equal(4);
});

it('should work with never observable and empty iterable', () => {
const a = cold( '-');
const asubs = '^';
const b: number[] = [];
const expected = '-';

expectObservable(zip(a, b)).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work with empty observable and empty iterable', () => {
const a = cold('|');
const asubs = '(^!)';
Expand All @@ -151,11 +115,11 @@ describe('static zip', () => {
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work with non-empty observable and empty iterable', () => {
it('should complete instantly if given an empty iterable', () => {
const a = hot('---^----a--|');
const asubs = '^ !';
const asubs = '(^!)';
const b: number[] = [];
const expected = '--------|';
const expected = '|';

expectObservable(zip(a, b)).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
Expand All @@ -181,16 +145,6 @@ describe('static zip', () => {
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work with non-empty observable and empty iterable', () => {
const a = hot('---^----#');
const asubs = '^ !';
const b: number[] = [];
const expected = '-----#';

expectObservable(zip(a, b)).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});

it('should work with observable which raises error and non-empty iterable', () => {
const a = hot('---^----#');
const asubs = '^ !';
Expand Down Expand Up @@ -574,4 +528,18 @@ describe('static zip', () => {
expect(vals).to.deep.equal(r[i++]);
}, null, done);
});

it('should be able to zip all iterables', () => {
const results: any[] = [];
zip('abc', '123', 'xyz').subscribe({
next: value => results.push(value),
complete: () => results.push('complete')
});
expect(results).to.deep.equal([
['a','1','x'],
['b','2','y'],
['c','3','z'],
'complete'
]);
})
});
60 changes: 12 additions & 48 deletions spec/operators/zipAll-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,15 @@ describe('zipAll operator', () => {
describe('with iterables', () => {
it('should zip them with values', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const myIterator = {
count: 0,
next() {
return { value: this.count++, done: false };
},
[Symbol.iterator]() {
return this;
},
};
const myIterator = (function* () {
for (let i = 0; i < 4; i++) {
yield i;
}
})();

const e1 = hot(' ---a---b---c---d---|');
const e1subs = ' ^------------------!';
const expected = '---w---x---y---z---|';
const e1subs = ' ^--------------!';
const expected = '---w---x---y---(z|)';

const values = {
w: ['a', 0],
Expand All @@ -127,32 +123,12 @@ describe('zipAll operator', () => {
});
});

it('should only call `next` as needed', () => {
let nextCalled = 0;
const myIterator = {
count: 0,
next() {
nextCalled++;
return { value: this.count++, done: false };
},
[Symbol.iterator]() {
return this;
},
};

of(of(1, 2, 3), myIterator).pipe(zipAll()).subscribe();

// since zip will call `next()` in advance, total calls when
// zipped with 3 other values should be 4.
expect(nextCalled).to.equal(4);
});

it('should work with never observable and empty iterable', () => {
it('should complete instantly with never observable and empty iterable', () => {
rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const a = cold(' -');
const asubs = ' ^';
const asubs = ' (^!)';
const b: string[] = [];
const expected = '-';
const expected = '|';

expectObservable(of(a, b).pipe(zipAll())).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
Expand Down Expand Up @@ -186,9 +162,9 @@ describe('zipAll operator', () => {
it('should work with non-empty observable and empty iterable', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---^----a--|');
const asubs = ' ^-------!';
const asubs = ' (^!)';
const b: string[] = [];
const expected = '--------|';
const expected = '|';

expectObservable(of(a, b).pipe(zipAll())).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
Expand Down Expand Up @@ -219,18 +195,6 @@ describe('zipAll operator', () => {
});
});

it('should work with non-empty observable and empty iterable', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---^----#');
const asubs = ' ^----!';
const b: string[] = [];
const expected = '-----#';

expectObservable(of(a, b).pipe(zipAll())).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});
});

it('should work with observable which raises error and non-empty iterable', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---^----#');
Expand Down
67 changes: 15 additions & 52 deletions spec/operators/zipWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { zipWith, mergeMap } from 'rxjs/operators';
import { queueScheduler, of } from 'rxjs';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
/** @test {zip} */

/** @test {zipWith} */
describe('zipWith', () => {
let rxTestScheduler: TestScheduler;

Expand Down Expand Up @@ -74,19 +75,15 @@ describe('zipWith', () => {
describe('with iterables', () => {
it('should zip them with values', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const myIterator = <any>{
count: 0,
next: function() {
return { value: this.count++, done: false };
},
};
myIterator[Symbol.iterator] = function() {
return this;
};
const myIterator = (function*() {
for (let i = 0; i < 4; i++) {
yield i;
}
})();

const e1 = hot(' ---a---b---c---d---|');
const e1subs = ' ^------------------!';
const expected = '---w---x---y---z---|';
const e1subs = ' ^--------------!';
const expected = '---w---x---y---(z|)';

const values = {
w: ['a', 0],
Expand All @@ -100,33 +97,11 @@ describe('zipWith', () => {
});
});

it('should only call `next` as needed', () => {
let nextCalled = 0;
const myIterator = <any>{
count: 0,
next: function() {
nextCalled++;
return { value: this.count++, done: false };
},
};
myIterator[Symbol.iterator] = function() {
return this;
};

of(1, 2, 3)
.pipe(zipWith(myIterator))
.subscribe();

// since zip will call `next()` in advance, total calls when
// zipped with 3 other values should be 4.
expect(nextCalled).to.equal(4);
});

it('should work with never observable and empty iterable', () => {
it('should complete instantly for an empty iterable', () => {
rxTestScheduler.run(({ cold, expectObservable, expectSubscriptions }) => {
const a = cold(' -');
const asubs = ' ^';
const expected = '-';
const asubs = ' (^!)';
const expected = '|';
const b: string[] = [];

expectObservable(a.pipe(zipWith(b))).toBe(expected);
Expand Down Expand Up @@ -158,12 +133,12 @@ describe('zipWith', () => {
});
});

it('should work with non-empty observable and empty iterable', () => {
it('should complete instantly with non-empty observable and empty iterable', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot(' ---^----a--|');
const asubs = ' ^-------!';
const asubs = ' (^!)';
const b: string[] = [];
const expected = '--------|';
const expected = ' |';

expectObservable(a.pipe(zipWith(b))).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
Expand Down Expand Up @@ -194,18 +169,6 @@ describe('zipWith', () => {
});
});

it('should work with non-empty observable and empty iterable', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---^----#');
const asubs = ' ^----!';
const expected = '-----#';
const b: string[] = [];

expectObservable(a.pipe(zipWith(b))).toBe(expected);
expectSubscriptions(a.subscriptions).toBe(asubs);
});
});

it('should work with observable which raises error and non-empty iterable', () => {
rxTestScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const a = hot('---^----#');
Expand Down
Loading

0 comments on commit 02c3a1b

Please sign in to comment.