Skip to content

Commit

Permalink
feat(WebSocketSubject): add basic WebSocketSubject implementation
Browse files Browse the repository at this point in the history
- adds `_finalComplete`, `_finalError`, and `_finalNext` protected methods to Subject. This is to
  allow explicit calls to send to final destination observers, since the destination and the final set of observers
  can differ in implementations like WebSocketSubject
  • Loading branch information
benlesh committed Jan 13, 2016
1 parent 2ca4236 commit 58cd806
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 34 deletions.
139 changes: 139 additions & 0 deletions spec/observables/dom/webSocket-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/* globals describe, it, expect, sinon, rxTestScheduler */
var Rx = require('../../../dist/cjs/Rx.DOM');
var Observable = Rx.Observable;

function noop() {
// nope.
}

describe('Observable.webSocket', function () {
beforeEach(function () {
setupMockWebSocket();
});

afterEach(function () {
teardownMockWebSocket();
});

it ('should send a message', function () {
var messageReceived = false;
var subject = Observable.webSocket('ws://mysocket');

subject.next('ping');

subject.subscribe(function (x) {
expect(x).toBe('pong');
messageReceived = true;
});

var socket = MockWebSocket.lastSocket();

socket.open();
expect(socket.lastMessageSent()).toBe('ping');

socket.triggerMessage('pong');
expect(messageReceived).toBe(true);
});
});

var sockets = [];

function MockWebSocket(url, protocol) {
sockets.push(this);
this.url = url;
this.protocol = protocol;
this.sent = [];
this.handlers = {};
this.readyState = 1;
}

MockWebSocket.lastSocket = function () {
return sockets.length > 0 ? sockets[sockets.length - 1] : undefined;
};

MockWebSocket.prototype = {
send: function (data) {
this.sent.push(data);
},

lastMessageSent: function () {
var sent = this.sent;
return sent.length > 0 ? sent[sent.length - 1] : undefined;
},

triggerClose: function (e) {
this.readyState = 3;
this.trigger('close', e);
},

triggerError: function (err) {
this.readyState = 3;
this.trigger('error', err);
},

triggerMessage: function (data) {
var messageEvent = {
data: JSON.stringify(data),
origin: 'mockorigin',
ports: undefined,
source: __root__,
};

this.trigger('message', messageEvent);
},

open: function () {
this.readyState = 1;
this.trigger('open', {});
},

close: function (code, reason) {
if (this.readyState < 2) {
this.readyState = 2;
this.closeCode = code;
this.closeReason = reason;
this.triggerClose();
}
},

addEventListener: function (name, handler) {
var lookup = this.handlers[name] = this.handlers[name] || [];
lookup.push(handler);
},

removeEventListener: function (name, handler) {
var lookup = this.handlers[name];
if (lookup) {
for (var i = lookup.length - 1; i--;) {
if (lookup[i] === handler) {
lookup.splice(i, 1);
}
}
}
},

trigger: function (name, e) {
if (this['on' + name]) {
this['on' + name](e);
}

var lookup = this.handlers[name];
if (lookup) {
for (var i = 0; i < lookup.length; i++) {
lookup[i](e);
}
}
}
}

var __ws;
function setupMockWebSocket() {
sockets = [];
__ws = __root__.WebSocket;
__root__.WebSocket = MockWebSocket;
}

function teardownMockWebSocket() {
__root__.WebSocket = __ws;
sockets = null;
}
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {RangeObservable} from './observable/range';
import {InfiniteObservable} from './observable/never';
import {ErrorObservable} from './observable/throw';
import {AjaxCreationMethod} from './observable/dom/ajax';
import {WebSocketSubject} from './observable/dom/webSocket';

/**
* A representation of any set of values over any amount of time. This the most basic building block
Expand Down Expand Up @@ -187,6 +188,7 @@ export class Observable<T> implements CoreOperators<T> {
static range: typeof RangeObservable.create;
static throw: typeof ErrorObservable.create;
static timer: typeof TimerObservable.create;
static webSocket: typeof WebSocketSubject.create;
static zip: typeof zipStatic;

// core operators
Expand Down
1 change: 1 addition & 0 deletions src/Rx.DOM.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import './add/observable/timer';
import './add/operator/zip-static';

import './add/observable/dom/ajax';
import './add/observable/dom/webSocket';

//operators
import './add/operator/buffer';
Expand Down
80 changes: 46 additions & 34 deletions src/Subject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import {rxSubscriber} from './symbol/rxSubscriber';

export class Subject<T> extends Observable<T> implements Observer<T>, Subscription {

static create<T>(source: Observable<T>, destination: Observer<T>): Subject<T> {
static create: Function = <T>(source: Observable<T>, destination: Observer<T>): Subject<T> => {
return new Subject<T>(source, destination);
}
};

constructor(source?: Observable<T>, destination?: Observer<T>) {
super();
Expand Down Expand Up @@ -127,60 +127,72 @@ export class Subject<T> extends Observable<T> implements Observer<T>, Subscripti
if (this.destination) {
this.destination.next(value);
} else {
let index = -1;
const observers = this.observers.slice(0);
const len = observers.length;
this._finalNext(value);
}
}

while (++index < len) {
observers[index].next(value);
}
protected _finalNext(value: T): void {
let index = -1;
const observers = this.observers.slice(0);
const len = observers.length;

while (++index < len) {
observers[index].next(value);
}
}

protected _error(err: any): void {
if (this.destination) {
this.destination.error(err);
} else {
let index = -1;
const observers = this.observers;
const len = observers.length;
this._finalError(err);
}
}

// optimization to block our SubjectSubscriptions from
// splicing themselves out of the observers list one by one.
this.observers = null;
this.isUnsubscribed = true;
protected _finalError(err: any): void {
let index = -1;
const observers = this.observers;
const len = observers.length;

while (++index < len) {
observers[index].error(err);
}

this.isUnsubscribed = false;
// optimization to block our SubjectSubscriptions from
// splicing themselves out of the observers list one by one.
this.observers = null;
this.isUnsubscribed = true;

this.unsubscribe();
while (++index < len) {
observers[index].error(err);
}

this.isUnsubscribed = false;

this.unsubscribe();
}

protected _complete(): void {
if (this.destination) {
this.destination.complete();
} else {
let index = -1;
const observers = this.observers;
const len = observers.length;
this._finalComplete();
}
}

// optimization to block our SubjectSubscriptions from
// splicing themselves out of the observers list one by one.
this.observers = null;
this.isUnsubscribed = true;
protected _finalComplete(): void {
let index = -1;
const observers = this.observers;
const len = observers.length;

while (++index < len) {
observers[index].complete();
}

this.isUnsubscribed = false;
// optimization to block our SubjectSubscriptions from
// splicing themselves out of the observers list one by one.
this.observers = null;
this.isUnsubscribed = true;

this.unsubscribe();
while (++index < len) {
observers[index].complete();
}

this.isUnsubscribed = false;

this.unsubscribe();
}

[rxSubscriber]() {
Expand Down
5 changes: 5 additions & 0 deletions src/add/observable/dom/webSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import {Observable} from '../../../Observable';
import {WebSocketSubject} from '../../../observable/dom/webSocket';
Observable.webSocket = WebSocketSubject.create;

export var _void: void;
Loading

0 comments on commit 58cd806

Please sign in to comment.