diff --git a/spec/observables/dom/webSocket-spec.js b/spec/observables/dom/webSocket-spec.js index 9f5ebca8b3..d7ae11aa23 100644 --- a/spec/observables/dom/webSocket-spec.js +++ b/spec/observables/dom/webSocket-spec.js @@ -34,6 +34,90 @@ describe('Observable.webSocket', function () { socket.triggerMessage('pong'); expect(messageReceived).toBe(true); }); + + it ('receive multiple messages', function () { + var expected = ['what', 'do', 'you', 'do', 'with', 'a', 'drunken', 'sailor?']; + var results = []; + var subject = Observable.webSocket('ws://mysocket'); + + subject.subscribe(function (x) { + results.push(x); + }); + + var socket = MockWebSocket.lastSocket(); + + socket.open(); + + expected.forEach(function (x) { + socket.triggerMessage(x); + }); + + expect(results).toEqual(expected); + }); + + it ('should queue messages prior to subscription', function () { + var expected = ['make', 'him', 'walk', 'the', 'plank']; + var subject = Observable.webSocket('ws://mysocket'); + + expected.forEach(function (x) { + subject.next(x); + }); + + var socket = MockWebSocket.lastSocket(); + expect(socket).not.toBeDefined(); + + subject.subscribe(); + + socket = MockWebSocket.lastSocket(); + expect(socket.sent.length).toBe(0); + + socket.open(); + expect(socket.sent.length).toBe(expected.length); + }); + + it('should send messages immediately if alreayd open', function () { + var subject = Observable.webSocket('ws://mysocket'); + subject.subscribe(); + var socket = MockWebSocket.lastSocket(); + socket.open(); + + subject.next('avast!'); + expect(socket.lastMessageSent()).toBe('avast!'); + subject.next('ye swab!'); + expect(socket.lastMessageSent()).toBe('ye swab!'); + }); + + it('should close the socket when completed', function () { + var subject = Observable.webSocket('ws://mysocket'); + subject.subscribe(); + var socket = MockWebSocket.lastSocket(); + socket.open(); + + expect(socket.readyState).toBe(1); // open + + spyOn(socket, 'close').and.callThrough(); + expect(socket.close).not.toHaveBeenCalled(); + + subject.complete(); + expect(socket.close).toHaveBeenCalled(); + expect(socket.readyState).toBe(3); // closed + }); + + it('should allow resubscription after closure', function () { + var subject = Observable.webSocket('ws://mysocket'); + subject.subscribe(); + var socket1 = MockWebSocket.lastSocket(); + socket1.open(); + subject.complete(); + + subject.next('a mariner yer not. yarrr.'); + subject.subscribe(); + var socket2 = MockWebSocket.lastSocket(); + socket2.open(); + + expect(socket2).not.toBe(socket1); + expect(socket2.lastMessageSent()).toBe('a mariner yer not. yarrr.'); + }); }); var sockets = []; @@ -44,7 +128,7 @@ function MockWebSocket(url, protocol) { this.protocol = protocol; this.sent = []; this.handlers = {}; - this.readyState = 1; + this.readyState = 0; } MockWebSocket.lastSocket = function () { @@ -92,7 +176,7 @@ MockWebSocket.prototype = { this.readyState = 2; this.closeCode = code; this.closeReason = reason; - this.triggerClose(); + this.triggerClose({ wasClean: (!code || code === 1000) }); } }, diff --git a/src/Subject.ts b/src/Subject.ts index ebc817ee34..bd345583ef 100644 --- a/src/Subject.ts +++ b/src/Subject.ts @@ -152,15 +152,17 @@ export class Subject extends Observable implements Observer, Subscripti protected _finalError(err: any): void { let index = -1; const observers = this.observers; - const len = observers.length; // optimization to block our SubjectSubscriptions from // splicing themselves out of the observers list one by one. this.observers = null; this.isUnsubscribed = true; - while (++index < len) { - observers[index].error(err); + if (observers) { + const len = observers.length; + while (++index < len) { + observers[index].error(err); + } } this.isUnsubscribed = false; @@ -179,15 +181,17 @@ export class Subject extends Observable implements Observer, Subscripti protected _finalComplete(): void { let index = -1; const observers = this.observers; - const len = observers.length; // optimization to block our SubjectSubscriptions from // splicing themselves out of the observers list one by one. this.observers = null; this.isUnsubscribed = true; - while (++index < len) { - observers[index].complete(); + if (observers) { + const len = observers.length; + while (++index < len) { + observers[index].complete(); + } } this.isUnsubscribed = false; diff --git a/src/observable/dom/webSocket.ts b/src/observable/dom/webSocket.ts index 26a85211dc..cddc5beebe 100644 --- a/src/observable/dom/webSocket.ts +++ b/src/observable/dom/webSocket.ts @@ -70,17 +70,26 @@ export class WebSocketSubject extends Subject { } _unsubscribe() { + this.socket = null; this.source = null; + this.destination = new ReplaySubject(); this.isStopped = false; + this.hasErrored = false; + this.hasCompleted = false; this.observers = null; this.isUnsubscribed = false; } _subscribe(subscriber: Subscriber) { + if (!this.observers) { + this.observers = []; + } + const subscription = super._subscribe(subscriber); // HACK: For some reason transpilation wasn't honoring this in arrow functions below // Doesn't seem right, need to reinvestigate. const self = this; + const WebSocket = this.WebSocketCtor; if (self.source || !subscription || (subscription).isUnsubscribed) { return subscription;