diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index 1e3051d538..eb00be6646 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -394,6 +394,81 @@ describe('Observable.webSocket', () => { }); describe('multiplex', () => { + it('should be retryable', () => { + const results = []; + const subject = Observable.webSocket('ws://websocket'); + const source = subject.multiplex(() => { + return { sub: 'foo'}; + }, () => { + return { unsub: 'foo' }; + }, function (value: any) { + return value.name === 'foo'; + }); + + source + .retry(1) + .map((x: any) => x.value) + .take(2) + .subscribe((x: any) => { + results.push(x); + }); + + const socket = MockWebSocket.lastSocket; + socket.open(); + + expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' }); + socket.triggerClose({ wasClean: false }); // Bad connection + + const socket2 = MockWebSocket.lastSocket; + expect(socket2).not.to.equal(socket); + + socket2.open(); + expect(socket2.lastMessageSent).to.deep.equal({ sub: 'foo' }); + + socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' })); + socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' })); + + expect(results).to.deep.equal(['test', 'this']); + }); + + it('should be repeatable', () => { + const results = []; + const subject = Observable.webSocket('ws://websocket'); + const source = subject.multiplex(() => { + return { sub: 'foo'}; + }, () => { + return { unsub: 'foo' }; + }, function (value: any) { + return value.name === 'foo'; + }); + + source + .repeat(2) + .map((x: any) => x.value) + .subscribe((x: any) => { + results.push(x); + }); + + const socket = MockWebSocket.lastSocket; + socket.open(); + + expect(socket.lastMessageSent).to.deep.equal({ sub: 'foo' }, 'first multiplexed sub'); + socket.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' })); + socket.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' })); + socket.triggerClose({ wasClean: true }); + + const socket2 = MockWebSocket.lastSocket; + expect(socket2).not.to.equal(socket, 'a new socket was not created'); + + socket2.open(); + expect(socket2.lastMessageSent).to.deep.equal({ sub: 'foo' }, 'second multiplexed sub'); + socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'test' })); + socket2.triggerMessage(JSON.stringify({ name: 'foo', value: 'this' })); + socket2.triggerClose({ wasClean: true }); + + expect(results).to.deep.equal(['test', 'this', 'test', 'this'], 'results were not equal'); + }); + it('should multiplex over the websocket', () => { const results = []; const subject = Observable.webSocket('ws://websocket'); @@ -432,28 +507,6 @@ describe('Observable.webSocket', () => { (socket.close).restore(); }); - it('should work in combination with retry (issue #1466)', () => { - const error = { wasClean: false}; - const results = []; - - const subject = Observable.webSocket({url: 'ws://mysocket'}) - .multiplex( - () => results.push('sub'), - () => results.push('unsub'), - () => true) - .retry(1); - - subject.subscribe( - () => results.push('next'), - (e) => results.push(e)); - - let socket = MockWebSocket.lastSocket; - - socket.triggerClose(error); - - expect(results).to.deep.equal(['sub', 'unsub', 'sub', error, 'unsub']); - }); - it('should not close the socket until all subscriptions complete', () => { const socketSubject = Rx.Observable.webSocket({url: 'ws://mysocket'}); const results = []; diff --git a/src/observable/dom/WebSocketSubject.ts b/src/observable/dom/WebSocketSubject.ts index 8c4ba943e9..3dea185c29 100644 --- a/src/observable/dom/WebSocketSubject.ts +++ b/src/observable/dom/WebSocketSubject.ts @@ -78,6 +78,14 @@ export class WebSocketSubject extends AnonymousSubject { return sock; } + private _resetState() { + this.socket = null; + if (!this.source) { + this.destination = new ReplaySubject(); + } + this._output = new Subject(); + } + // TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) { const self = this; @@ -155,8 +163,7 @@ export class WebSocketSubject extends AnonymousSubject { observer.error(new TypeError('WebSocketSubject.error must be called with an object with an error code, ' + 'and an optional reason: { code: number, reason: string }')); } - this.destination = new ReplaySubject(); - this.socket = null; + this._resetState(); }, ( ) => { const closingObserver = this.closingObserver; @@ -164,8 +171,7 @@ export class WebSocketSubject extends AnonymousSubject { closingObserver.next(undefined); } socket.close(); - this.destination = new ReplaySubject(); - this.socket = null; + this._resetState(); } ); @@ -174,9 +180,13 @@ export class WebSocketSubject extends AnonymousSubject { } }; - socket.onerror = (e: Event) => observer.error(e); + socket.onerror = (e: Event) => { + this._resetState(); + observer.error(e); + }; socket.onclose = (e: CloseEvent) => { + this._resetState(); const closeObserver = this.closeObserver; if (closeObserver) { closeObserver.next(e); @@ -212,8 +222,8 @@ export class WebSocketSubject extends AnonymousSubject { const { socket } = this; if (this._output.observers.length === 0 && socket && socket.readyState === 1) { socket.close(); - this.socket = null; } + this._resetState(); }); return subscription; } @@ -222,7 +232,7 @@ export class WebSocketSubject extends AnonymousSubject { const { source, socket } = this; if (socket && socket.readyState === 1) { socket.close(); - this.socket = null; + this._resetState(); } super.unsubscribe(); if (!source) {