Skip to content

Commit

Permalink
fix(WebSocketSubject.prototype.multiplex): no longer nulls out socket…
Browse files Browse the repository at this point in the history
… after first unsubscribe (#2039)

This fix ensure the observers count goes to zero before the state is reset on the WebSocketSubject instance

fixes #2037
  • Loading branch information
benlesh authored and jayphelps committed Oct 24, 2016
1 parent 76a9abb commit a5e9cfe
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
55 changes: 55 additions & 0 deletions spec/observables/dom/webSocket-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,61 @@ describe('Observable.webSocket', () => {
(<any>socket.close).restore();
});

it('should keep the same socket for multiple multiplex subscriptions', () => {
const socketSubject = Rx.Observable.webSocket(<any>{url: 'ws://mysocket'});
const results = [];
const socketMessages = [
{id: 'A'},
{id: 'B'},
{id: 'A'},
{id: 'B'},
{id: 'B'},
];

const sub1 = socketSubject.multiplex(
() => 'no-op',
() => results.push('A unsub'),
(req: any) => req.id === 'A')
.takeWhile((req: any) => !req.complete)
.subscribe(
() => results.push('A next'),
(e) => results.push('A error ' + e),
() => results.push('A complete')
);

socketSubject.multiplex(
() => 'no-op',
() => results.push('B unsub'),
(req: any) => req.id === 'B')
.subscribe(
() => results.push('B next'),
(e) => results.push('B error ' + e),
() => results.push('B complete')
);

// Setup socket and send messages
let socket = MockWebSocket.lastSocket;
socket.open();
socketMessages.forEach((msg, i) => {
if (i === 1) {
sub1.unsubscribe();
expect(socketSubject.socket).to.equal(socket);
}
socket.triggerMessage(JSON.stringify(msg));
});
socket.triggerClose({ wasClean: true });

expect(results).to.deep.equal([
'A next',
'A unsub',
'B next',
'B next',
'B next',
'B complete',
'B unsub',
]);
});

it('should not close the socket until all subscriptions complete', () => {
const socketSubject = Rx.Observable.webSocket(<any>{url: 'ws://mysocket'});
const results = [];
Expand Down
8 changes: 5 additions & 3 deletions src/observable/dom/WebSocketSubject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
subscription.add(this._output.subscribe(subscriber));
subscription.add(() => {
const { socket } = this;
if (this._output.observers.length === 0 && socket && socket.readyState === 1) {
socket.close();
if (this._output.observers.length === 0) {
if (socket && socket.readyState === 1) {
socket.close();
}
this._resetState();
}
this._resetState();
});
return subscription;
}
Expand Down

0 comments on commit a5e9cfe

Please sign in to comment.