Skip to content

Commit

Permalink
fix(multicast): Fixes multicast with selector to create a new source …
Browse files Browse the repository at this point in the history
…connection per subscriber. (#1774)

Multicast with a selector function should create a new ConnectableObservable for each subscriber to
the MulticastObservable. This ensures each subscriber creates a new connection to the source
Observable, and don't share subscription side-effects.
  • Loading branch information
trxcllnt authored and benlesh committed Jun 16, 2016
1 parent c7e2366 commit c3ac852
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
26 changes: 24 additions & 2 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ describe('Observable.prototype.multicast', () => {
connectable.connect();
});

it('should accept selectors to factory functions', () => {
it('should accept a multicast selector and connect to a hot source for each subscriber', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
Expand All @@ -67,6 +69,26 @@ describe('Observable.prototype.multicast', () => {
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should accept a multicast selector and connect to a cold source for each subscriber', () => {
const source = cold('-1-2-3----4-|');
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const expected1 = '-2-4-6----8-|';
const expected2 = ' -2-4-6----8-|';
const expected3 = ' -2-4-6----8-|';
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
const subscriber3 = hot(' c| ').mergeMapTo(multicasted);

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should do nothing if connect is not called, despite subscriptions', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = [];
Expand Down
6 changes: 4 additions & 2 deletions spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ describe('Observable.prototype.publish', () => {

it('should accept selectors', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(published);
const expected1 = '-2-4-6----8-|';
Expand Down Expand Up @@ -321,4 +323,4 @@ describe('Observable.prototype.publish', () => {
expect(subscriptions).to.equal(1);
done();
});
});
});
14 changes: 14 additions & 0 deletions spec/support/debug.opts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
--require source-map-support/register
--require spec/support/mocha-setup-node.js
--require spec-js/helpers/test-helper.js
--require spec-js/helpers/ajax-helper.js
--ui spec-js/helpers/testScheduler-ui.js

--reporter dot
--bail
--full-trace
--check-leaks
--globals WebSocket,FormData

--recursive
--timeout 100000
9 changes: 5 additions & 4 deletions src/observable/MulticastObservable.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {ConnectableObservable} from '../observable/ConnectableObservable';

export class MulticastObservable<T> extends Observable<T> {
constructor(protected source: Observable<T>,
private connectable: ConnectableObservable<T>,
private subjectFactory: () => Subject<T>,
private selector: (source: Observable<T>) => Observable<T>) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
const {selector, connectable} = this;

const { selector, source } = this;
const connectable = new ConnectableObservable(source, this.subjectFactory);
const subscription = selector(connectable).subscribe(subscriber);
subscription.add(connectable.connect());
return subscription;
}
}
}
5 changes: 3 additions & 2 deletions src/operator/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subjec
};
}

const connectable = new ConnectableObservable(this, subjectFactory);
return selector ? new MulticastObservable(this, connectable, selector) : connectable;
return !selector ?
new ConnectableObservable(this, subjectFactory) :
new MulticastObservable(this, subjectFactory, selector);
}

export type factoryOrValue<T> = T | (() => T);
Expand Down

0 comments on commit c3ac852

Please sign in to comment.