diff --git a/spec/observables/dom/webSocket-spec.ts b/spec/observables/dom/webSocket-spec.ts index 72ca69f53b..b2354f558c 100644 --- a/spec/observables/dom/webSocket-spec.ts +++ b/spec/observables/dom/webSocket-spec.ts @@ -436,5 +436,56 @@ describe('Observable.webSocket', () => { expect(results).to.deep.equal(['sub', 'unsub', 'sub', error, 'unsub']); }); + + it('should not close the socket until all subscriptions complete', () => { + const socketSubject = Rx.Observable.webSocket({url: 'ws://mysocket'}); + const results = []; + const socketMessages = [ + {id: 'A'}, + {id: 'B'}, + {id: 'A', complete: true}, + {id: 'B'}, + {id: 'B', complete: true}, + ]; + + socketSubject.multiplex( + () => 'no-op', + () => results.push('A unsub'), + (req: any) => req.id === 'A') + .takeWhile((req: any) => !req.complete) + .subscribe( + () => results.push('A next'), + (e) => results.push('A error ' + e), + () => results.push('A complete') + ); + + socketSubject.multiplex( + () => 'no-op', + () => results.push('B unsub'), + (req: any) => req.id === 'B') + .takeWhile((req: any) => !req.complete) + .subscribe( + () => results.push('B next'), + (e) => results.push('B error ' + e), + () => results.push('B complete') + ); + + // Setup socket and send messages + let socket = MockWebSocket.lastSocket; + socket.open(); + socketMessages.forEach((msg) => { + socket.triggerMessage(JSON.stringify(msg)); + }); + + expect(results).to.deep.equal([ + 'A next', + 'B next', + 'A complete', + 'A unsub', + 'B next', + 'B complete', + 'B unsub', + ]); + }); }); -}); \ No newline at end of file +}); diff --git a/src/observable/dom/WebSocketSubject.ts b/src/observable/dom/WebSocketSubject.ts index 819f6274d9..68a4228247 100644 --- a/src/observable/dom/WebSocketSubject.ts +++ b/src/observable/dom/WebSocketSubject.ts @@ -203,7 +203,7 @@ export class WebSocketSubject extends AnonymousSubject { subscription.add(this._output.subscribe(subscriber)); subscription.add(() => { const { socket } = this; - if (socket && socket.readyState === 1) { + if (this._output.observers.length === 0 && socket && socket.readyState === 1) { socket.close(); this.socket = null; }