Skip to content

Commit

Permalink
feat(zip): supports promises, iterables and lowercase-o observables
Browse files Browse the repository at this point in the history
- renames spec file appropriately
- removes limit functionality from zip, as it is not used by combineLatest any longer
- adds support for promises, iterables, Observables and lowercase-o observables
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent ce76e4e commit d332a0e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 27 deletions.
18 changes: 9 additions & 9 deletions spec/operators/zip-all-spec.js → spec/operators/zipAll-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('zipAll', function () {
describe('Observable.prototype.zipAll', function () {
it('should take all observables from the source and zip them', function (done) {
var expected = ['a1', 'b2', 'c3'];
var i = 0;
Observable.fromArray([
Observable.fromArray(['a', 'b', 'c']),
Observable.fromArray([1, 2, 3])
])
Observable.of(
Observable.of('a','b','c'),
Observable.of(1,2,3)
)
.zipAll(function (a, b) {
return a + b;
})
Expand All @@ -21,10 +21,10 @@ describe('zipAll', function () {
it('should zip until one child terminates', function (done) {
var expected = ['a1', 'b2'];
var i = 0;
Observable.fromArray([
Observable.fromArray(['a', 'b']),
Observable.fromArray([1, 2, 3])
])
Observable.of(
Observable.of('a','b','c'),
Observable.of(1,2)
)
.zipAll(function (a, b) {
return a + b;
})
Expand Down
25 changes: 7 additions & 18 deletions src/operators/zip-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ export class ZipSubscriber<T, R> extends OuterSubscriber<T, R> {
active: number = 0;
observables: Observable<any>[] = [];
project: (...values: Array<any>) => R;
limit: number = Number.POSITIVE_INFINITY;
buffers: any[][] = [];

constructor(destination: Subscriber<R>,
Expand Down Expand Up @@ -61,51 +60,41 @@ export class ZipSubscriber<T, R> extends OuterSubscriber<T, R> {
}
}


notifyNext(value: R, observable: T, index: number, observableIndex: number) {
const buffers = this.buffers;
buffers[observableIndex].push(value);

const len = buffers.length;
for (let i = 0; i < len; i++) {
let buffer = buffers[i];
if(buffer.length === 0) {
if(buffers[i].length === 0) {
return;
}
}

const outbound = [];
const args = [];
const destination = this.destination;
const project = this.project;

for(let i = 0; i < len; i++) {
outbound.push(buffers[i].shift());
args.push(buffers[i].shift());
}

if(project) {
let result = tryCatch(project)(outbound);
let result = tryCatch(project).apply(this, args);
if(result === errorObject){
destination.error(errorObject.e);
} else {
destination.next(result);
}
} else {
destination.next(outbound);
destination.next(args);
}
}

notifyComplete(innerSubscriber) {
notifyComplete() {
if((this.active -= 1) === 0) {
this.destination.complete();
} else {
this.limit = innerSubscriber.events;
}
}
}

function arrayInitialize(length) {
var arr = Array(length);
for (let i = 0; i < length; i++) {
arr[i] = null;
}
return arr;
}

0 comments on commit d332a0e

Please sign in to comment.