Skip to content

Commit

Permalink
feat(operator): add publishReplay operator and spec
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros authored and benlesh committed Aug 31, 2015
1 parent 522f0a8 commit a0c47d6
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 0 deletions.
102 changes: 102 additions & 0 deletions spec/operators/publishReplay-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.publishReplay()', function () {
it('should return a ConnectableObservable', function () {
var source = Observable.value(1).publishReplay();
expect(source instanceof Rx.ConnectableObservable).toBe(true);
});

it('should multicast one observable to multiple observers', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.complete();
});

var connectable = source.publishReplay();

connectable.subscribe(function (x) {
results1.push(x);
});
connectable.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([]);
expect(results2).toEqual([]);

connectable.connect();

expect(results1).toEqual([1, 2, 3, 4]);
expect(results2).toEqual([1, 2, 3, 4]);
expect(subscriptions).toBe(1);
done();
});

it('should replay as many events as specified by the bufferSize', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
});

var connectable = source.publishReplay(2);

connectable.subscribe(function (x) {
results1.push(x);
});

expect(results1).toEqual([]);
expect(results2).toEqual([]);

connectable.connect();

connectable.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([1, 2, 3, 4]);
expect(results2).toEqual([3, 4]);
expect(subscriptions).toBe(1);
done();
});

it('should allow you to reconnect by subscribing again', function (done) {
var expected = [1, 2, 3, 4];
var i = 0;

var source = Observable.of(1, 2, 3, 4).publishReplay();

source.subscribe(
function (x) {
expect(x).toBe(expected[i++]);
},
null,
function () {
i = 0;

source.subscribe(function (x) {
expect(x).toBe(expected[i++]);
}, null, done);

source.connect();
});

source.connect();
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ export default class Observable<T> {

publish: () => ConnectableObservable<T>;
publishBehavior: (value: any) => ConnectableObservable<T>;
publishReplay: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => ConnectableObservable<T>;
multicast: (subjectFactory: () => Subject<T>) => ConnectableObservable<T>;

catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,12 @@ observableProto.zipAll = zipAll;

import publish from './operators/publish';
import publishBehavior from './operators/publishBehavior';
import publishReplay from './operators/publishReplay';
import multicast from './operators/multicast';

observableProto.publish = publish;
observableProto.publishBehavior = publishBehavior;
observableProto.publishReplay = publishReplay;
observableProto.multicast = multicast;

import observeOn from './operators/observeOn';
Expand Down
11 changes: 11 additions & 0 deletions src/operators/publishReplay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import ReplaySubject from '../subjects/ReplaySubject';
import Scheduler from '../Scheduler';
import multicast from './multicast';

export default function publishReplay(bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: Scheduler) {
return multicast.call(this,
() => new ReplaySubject(bufferSize, windowTime, scheduler)
);
}

0 comments on commit a0c47d6

Please sign in to comment.