Skip to content

Commit

Permalink
chore(TestScheduler): add asserts for unsubscriptions
Browse files Browse the repository at this point in the history
Add expectSubscription(coldOrHotTestObservable.subscriptions).toBe(unsubs)
feature to test utilities. Create multiple test utility classes such as
ColdObservable, HotObservable, SubscriptionLog, TestMessage, etc. Also
fix minor issues with VirtualTimeScheduler and Subject.
Subject._subscribe() was taking an Observer, but its parent
Observable._subscribe() was taking a Subscriber, so this commit changes
Subject._subscribe() to take a Subscriber as well. This facilitates
usage in HotObservable, for example.

Resolves issue ReactiveX#428.
  • Loading branch information
Andre Medeiros committed Oct 6, 2015
1 parent af603bd commit f5e69c4
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 49 deletions.
9 changes: 8 additions & 1 deletion spec/helpers/test-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ global.expectObservable = function () {
if (!global.rxTestScheduler) {
throw 'tried to use expectObservable() in async test';
}
return global.rxTestScheduler.expect.apply(global.rxTestScheduler, arguments);
return global.rxTestScheduler.expectObservable.apply(global.rxTestScheduler, arguments);
};

global.expectSubscriptions = function () {
if (!global.rxTestScheduler) {
throw 'tried to use expectSubscriptions() in async test';
}
return global.rxTestScheduler.expectSubscriptions.apply(global.rxTestScheduler, arguments);
};

var glit = global.it;
Expand Down
2 changes: 1 addition & 1 deletion spec/operators/bufferTime-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ describe('Observable.prototype.bufferTime', function () {
var values = {
w: ['a','b']
};
var e1 = hot('---a---b---c---#---e---f---g---|');
var e1 = hot('---a---b---c---#');
var expected = '----------w----#';

expectObservable(e1.bufferTime(100, null, rxTestScheduler)).toBe(expected, values);
Expand Down
11 changes: 8 additions & 3 deletions spec/operators/merge-map-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,17 @@ describe('Observable.prototype.mergeMap()', function () {

it('should mergeMap many outer values to many inner values', function () {
var values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
var e1 = hot('-a-------b-------c-------d-------|');
var inner = cold('----i---j---k---l---|', values);
var expected = '-----i---j---(ki)(lj)(ki)(lj)(ki)(lj)k---l---|';
var e1 = hot('-a-------b-------c-------d-------|');
var inner = cold('----i---j---k---l---|', values);
var innersubs = ['-^-------------------!',
'---------^-------------------!',
'-----------------^-------------------!',
'-------------------------^-------------------!'];
var expected = '-----i---j---(ki)(lj)(ki)(lj)(ki)(lj)k---l---|';

expectObservable(e1.mergeMap(function (value) { return inner; }))
.toBe(expected, values);
expectSubscriptions(inner.subscriptions).toBe(innersubs);
});

it('should mergeMap many outer to many inner, complete late', function () {
Expand Down
4 changes: 4 additions & 0 deletions spec/operators/switch-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ describe('Observable.prototype.switch()', function () {

it('should handle a hot observable of observables', function () {
var x = cold( '--a---b---c--|');
var xsubs = '------^-------!';
var y = cold( '---d--e---f---|');
var ysubs = '--------------^-------------!';
var e1 = hot( '------x-------y------|', { x: x, y: y });
var expected = '--------a---b----d--e---f---|';
expectObservable(e1.switch()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should handle an observable of promises', function (done) {
Expand Down
2 changes: 1 addition & 1 deletion src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ import nextTick from './schedulers/nextTick';
import immediate from './schedulers/immediate';
import NextTickScheduler from './schedulers/NextTickScheduler';
import ImmediateScheduler from './schedulers/ImmediateScheduler';
import TestScheduler from './schedulers/TestScheduler';
import {TestScheduler} from './testing/TestScheduler';
import VirtualTimeScheduler from './schedulers/VirtualTimeScheduler';

var Scheduler = {
Expand Down
2 changes: 1 addition & 1 deletion src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export default class Subject<T> extends Observable<T> implements Observer<T>, Su
return subject;
}

_subscribe(subscriber: Observer<any>) : Subscription<T> {
_subscribe(subscriber: Subscriber<any>) : Subscription<T> {

if (subscriber.isUnsubscribed) {
return;
Expand Down
2 changes: 1 addition & 1 deletion src/schedulers/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export default class VirtualTimeScheduler implements Scheduler {
protected static frameTimeFactor: number = 10;

now() {
return this.frame * VirtualTimeScheduler.frameTimeFactor;
return this.frame;
}

flush() {
Expand Down
42 changes: 42 additions & 0 deletions src/testing/ColdObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import Observable from '../Observable';
import Subscription from '../Subscription';
import Scheduler from '../Scheduler';
import TestMessage from './TestMessage';
import SubscriptionLog from './SubscriptionLog';
import SubscriptionLoggable from './SubscriptionLoggable';
import applyMixins from '../util/applyMixins';

export default class ColdObservable<T> extends Observable<T> implements SubscriptionLoggable {
public subscriptions: SubscriptionLog[] = [];
scheduler: Scheduler;
logSubscribedFrame: () => number;
logUnsubscribedFrame: (index: number) => void;

constructor(private messages: TestMessage[],
scheduler: Scheduler) {
super(function (subscriber) {
const observable: ColdObservable<T> = this;
const index = observable.logSubscribedFrame();
subscriber.add(new Subscription(() => {
observable.logUnsubscribedFrame(index);
}));
observable.scheduleMessages(subscriber);
return subscriber;
});
this.scheduler = scheduler;
}

scheduleMessages(subscriber) {
const messagesLength = this.messages.length;
for (let i = 0; i < messagesLength; i++) {
const message = this.messages[i];
subscriber.add(
this.scheduler.schedule(
() => { message.notification.observe(subscriber); },
message.frame
)
);
}
}
}
applyMixins(ColdObservable, [SubscriptionLoggable]);
44 changes: 44 additions & 0 deletions src/testing/HotObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import Subject from '../Subject';
import Subscriber from './Subscriber';
import Subscription from '../Subscription';
import Scheduler from '../Scheduler';
import TestMessage from './TestMessage';
import SubscriptionLog from './SubscriptionLog';
import SubscriptionLoggable from './SubscriptionLoggable';
import applyMixins from '../util/applyMixins';

export default class HotObservable<T> extends Subject<T> implements SubscriptionLoggable {
public subscriptions: SubscriptionLog[] = [];
scheduler: Scheduler;
logSubscribedFrame: () => number;
logUnsubscribedFrame: (index: number) => void;

constructor(private messages: TestMessage[],
scheduler: Scheduler) {
super();
this.scheduler = scheduler;
}

_subscribe(subscriber: Subscriber<any>): Subscription<T> {
const subject: HotObservable<T> = this;
const index = subject.logSubscribedFrame();
subscriber.add(new Subscription(() => {
subject.logUnsubscribedFrame(index);
}));
return super._subscribe(subscriber);
}

setup() {
const subject = this;
const messagesLength = subject.messages.length;
for (let i = 0; i < messagesLength; i++) {
const message = subject.messages[i];
this.scheduler.schedule(
() => { message.notification.observe(subject); },
message.frame
);
}
}
}
applyMixins(HotObservable, [SubscriptionLoggable]);

18 changes: 18 additions & 0 deletions src/testing/SubscriptionLog.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export default class SubscriptionLog {
private _subscribedFrame: number;
private _unsubscribedFrame: number;

get subscribedFrame(): number {
return this._subscribedFrame;
}

get unsubscribedFrame(): number {
return this._unsubscribedFrame;
}

constructor(subscribedFrame: number,
unsubscribedFrame: number = Number.POSITIVE_INFINITY) {
this._subscribedFrame = subscribedFrame;
this._unsubscribedFrame = unsubscribedFrame;
}
}
22 changes: 22 additions & 0 deletions src/testing/SubscriptionLoggable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Scheduler from '../Scheduler';
import SubscriptionLog from './SubscriptionLog';

export default class SubscriptionLoggable {
public subscriptions: SubscriptionLog[] = [];
scheduler: Scheduler;

logSubscribedFrame(): number {
this.subscriptions.push(new SubscriptionLog(this.scheduler.now()));
return this.subscriptions.length - 1;
}

logUnsubscribedFrame(index: number) {
const subscriptionLogs = this.subscriptions;
const oldSubscriptionLog = subscriptionLogs[index];
subscriptionLogs[index] = new SubscriptionLog(
oldSubscriptionLog.subscribedFrame,
this.scheduler.now()
);
}
}

8 changes: 8 additions & 0 deletions src/testing/TestMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import Notification from '../Notification';

interface TestMessage {
frame: number;
notification: Notification<any>;
}

export default TestMessage;
84 changes: 43 additions & 41 deletions src/schedulers/TestScheduler.ts → src/testing/TestScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
import Observable from '../Observable';
import VirtualTimeScheduler from './VirtualTimeScheduler';
import VirtualTimeScheduler from '../schedulers/VirtualTimeScheduler';
import Notification from '../Notification';
import Subject from '../Subject';
import ColdObservable from './ColdObservable';
import HotObservable from './HotObservable';
import TestMessage from './TestMessage';
import SubscriptionLog from './SubscriptionLog';

interface FlushableTest {
observable: Observable<any>;
marbles: string;
ready: boolean;
actual?: any[];
expected?: any[];
}

interface SetupableHotObservable {
setup: (scheduler: TestScheduler) => void;
subject: Subject<any>;
}

interface TestMessage {
frame: number;
notification: Notification<any>;
}
export default TestMessage;
export type observableToBeFn = (marbles: string, values?: any, errorValue?: any) => void;
export type subscriptionLogsToBeFn = (marbles: string | string[]) => void;

export default class TestScheduler extends VirtualTimeScheduler {
private setupableHotObservables: SetupableHotObservable[] = [];
export class TestScheduler extends VirtualTimeScheduler {
private hotObservables: HotObservable<any>[] = [];
private flushTests: FlushableTest[] = [];

constructor(public assertDeepEqual: (actual: any, expected: any) => boolean | void) {
Expand All @@ -38,38 +32,23 @@ export default class TestScheduler extends VirtualTimeScheduler {
throw new Error('Cold observable cannot have unsubscription marker "!"');
}
let messages = TestScheduler.parseMarbles(marbles, values, error);
return Observable.create(subscriber => {
messages.forEach(({ notification, frame }) => {
subscriber.add(this.schedule(() => {
notification.observe(subscriber);
}, frame));
}, this);
});
return new ColdObservable(messages, this);
}

createHotObservable<T>(marbles: string, values?: any, error?: any): Subject<T> {
if (marbles.indexOf('!') !== -1) {
throw new Error('Hot observable cannot have unsubscription marker "!"');
}
let messages = TestScheduler.parseMarbles(marbles, values, error);
let subject = new Subject();
this.setupableHotObservables.push({
subject,
setup(scheduler) {
messages.forEach(({ notification, frame }) => {
scheduler.schedule(() => {
notification.observe(subject);
}, frame);
});
}
});
const subject = new HotObservable(messages, this);
this.hotObservables.push(subject);
return subject;
}

expect(observable: Observable<any>,
unsubscriptionMarbles: string = null): ({ toBe: (marbles: string, values?: any, errorValue?: any) => void }) {
let actual = [];
let flushTest: FlushableTest = { observable, actual, marbles: null, ready: false };
expectObservable(observable: Observable<any>,
unsubscriptionMarbles: string = null): ({ toBe: observableToBeFn }) {
let actual: TestMessage[] = [];
let flushTest: FlushableTest = { actual, ready: false };
let unsubscriptionFrame = TestScheduler.getUnsubscriptionFrame(unsubscriptionMarbles);
let subscription;

Expand All @@ -92,16 +71,29 @@ export default class TestScheduler extends VirtualTimeScheduler {
return {
toBe(marbles: string, values?: any, errorValue?: any) {
flushTest.ready = true;
flushTest.marbles = marbles;
flushTest.expected = TestScheduler.parseMarbles(marbles, values, errorValue);
}
};
}

expectSubscriptions(actualSubscriptionLogs: SubscriptionLog[]): ({ toBe: subscriptionLogsToBeFn }) {
const flushTest: FlushableTest = { actual: actualSubscriptionLogs, ready: false };
this.flushTests.push(flushTest);
return {
toBe(marbles: string | string[]) {
const marblesArray: string[] = (typeof marbles === 'string') ? [marbles] : marbles;
flushTest.ready = true;
flushTest.expected = marblesArray.map(marbles =>
TestScheduler.parseMarblesAsSubscriptions(marbles)
);
}
};
}

flush() {
const setupableHotObservables = this.setupableHotObservables;
while (setupableHotObservables.length > 0) {
setupableHotObservables.shift().setup(this);
const hotObservables = this.hotObservables;
while (hotObservables.length > 0) {
hotObservables.shift().setup();
}

super.flush();
Expand All @@ -120,6 +112,16 @@ export default class TestScheduler extends VirtualTimeScheduler {
return marbles.indexOf('!') * this.frameTimeFactor;
}

static parseMarblesAsSubscriptions(marbles: string): SubscriptionLog {
let subscriptionFrame = marbles.indexOf('^') * this.frameTimeFactor;
let unsubscriptionFrame = marbles.indexOf('!') * this.frameTimeFactor;
if (unsubscriptionFrame < 0) {
return new SubscriptionLog(subscriptionFrame);
} else {
return new SubscriptionLog(subscriptionFrame, unsubscriptionFrame);
}
}

static parseMarbles(marbles: string, values?: any, errorValue?: any): TestMessage[] {
if (marbles.indexOf('!') !== -1) {
throw new Error('Conventional marble diagrams cannot have the ' +
Expand Down
10 changes: 10 additions & 0 deletions src/util/applyMixins.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export default function applyMixins(derivedCtor: any, baseCtors: any[]) {
for (let i = 0, len = baseCtors.length; i < len; i++) {
const baseCtor = baseCtors[i];
const propertyKeys = Object.getOwnPropertyNames(baseCtor.prototype);
for (let j = 0, len2 = propertyKeys.length; j < len2; j++) {
const name = propertyKeys[j];
derivedCtor.prototype[name] = baseCtor.prototype[name];
}
}
}

0 comments on commit f5e69c4

Please sign in to comment.