Skip to content

Commit

Permalink
fix: Webchat socket instantiation delay (#2179)
Browse files Browse the repository at this point in the history
* Updated websocket server code to backup messages if it is not connected
* Refactored more occurances of socket send
* Renaming variables
* Added unit test to make sure backedup messages are cleared before connection starts
* Lint fix
  • Loading branch information
srinaath authored Aug 21, 2020
1 parent 62060cd commit 03a9777
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [client] Fixed a bug where trying to open the sign-in link on an OAuth card would open the file explorer if ngrok was not configured in PR [2155](https://github.com/microsoft/BotFramework-Emulator/pull/2155)
- [client] Change to a warning message in inspector when clicking on LUIS trace [2160](https://github.com/microsoft/BotFramework-Emulator/pull/2160)
- [client] Handle result from webchat middleware gracefully [2177](https://github.com/microsoft/BotFramework-Emulator/pull/2177)
- [client] Handle Webchat socket instantiation delay [2179](https://github.com/microsoft/BotFramework-Emulator/pull/2179)

## v4.9.0 - 2020 - 05 - 11
## Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ export function createReplyToActivityHandler(emulatorServer: EmulatorRestServer)

// post activity
activity = conversation.prepActivityToBeSentToUser(conversation.user.id, activity);
const payload = { activities: [activity] };
const socket = WebSocketServer.getSocketByConversationId(conversationId);
socket && socket.send(JSON.stringify(payload));

WebSocketServer.sendToSubscribers(conversation.conversationId, activity);
res.send(HttpStatus.OK, { id: activity.id });
res.end();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ export function sendActivityToConversation(req: Request, res: Response, next: Ne

// post activity
activity = conversation.prepActivityToBeSentToUser(conversation.user.id, activity);
const payload = { activities: [activity] };
const socket = WebSocketServer.getSocketByConversationId(conversation.conversationId);
socket && socket.send(JSON.stringify(payload));

WebSocketServer.sendToSubscribers(conversation.conversationId, activity);
res.send(HttpStatus.OK, { id: activity.id });
res.end();
} catch (err) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ export function sendHistoryToConversation(req: Request, res: Response, next: Nex
for (const activity of activities) {
try {
const updatedActivity = conversation.prepActivityToBeSentToUser(conversation.user.id, activity);
const payload = { activities: [updatedActivity] };
const socket = WebSocketServer.getSocketByConversationId(conversation.conversationId);
socket && socket.send(JSON.stringify(payload));
WebSocketServer.sendToSubscribers(conversation.conversationId, updatedActivity);
successCount++;
} catch (err) {
if (firstErrorMessage === '') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ export function createPostActivityHandler(emulatorServer: EmulatorRestServer) {
res.end();
return next();
}
const payload = { activities: [{ ...activity, id: activity.id }] };
const socket = WebSocketServer.getSocketByConversationId(conversation.conversationId);
socket && socket.send(JSON.stringify(payload));
WebSocketServer.sendToSubscribers(conversation.conversationId, activity);
}
} catch (err) {
sendErrorResponse(req, res, next, err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@ export function createFeedActivitiesAsTranscriptHandler(emulatorServer: Emulator
}
activities = conversation.prepTranscriptActivities(activities);
activities.forEach(activity => {
const payload = { activities: [activity] };
const socket = WebSocketServer.getSocketByConversationId(conversation.conversationId);
socket && socket.send(JSON.stringify(payload));
WebSocketServer.sendToSubscribers(conversation.conversationId, activity);
emulatorServer.logger.logActivity(conversation.conversationId, activity, activity.recipient.role);
});
} catch (e) {
Expand Down
82 changes: 82 additions & 0 deletions packages/app/main/src/server/webSocketServer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//

import { Activity } from 'botframework-schema';

import { WebSocketServer } from './webSocketServer';

const mockWSServer = {
Expand Down Expand Up @@ -157,4 +159,84 @@ describe('WebSocketServer', () => {
expect(Object.keys((WebSocketServer as any)._servers)).toEqual([mockConversationId]);
expect(mockWSServer.handleUpgrade).toHaveBeenCalledTimes(1);
});

it('should clear the messages backed up before websocket connection is started', async () => {
let onConnectionFunction = null;
let websocketHandler = null;

(WebSocketServer as any)._restServer = undefined;
(WebSocketServer as any)._servers = {};
(WebSocketServer as any)._sockets = {};

mockWSServer.on.mockImplementation((event, implementation) => {
if (event === 'connection') {
onConnectionFunction = implementation;
}
});

mockCreateServer.mockReturnValueOnce({
address: () => ({ port: 55523 }),
get: (route, handler) => {
websocketHandler = handler;
},
listen: jest.fn((_port, cb) => {
cb();
}),
once: jest.fn(),
});
await WebSocketServer.init();

WebSocketServer.queueActivities('conv-123', { id: 'activity-1' } as Activity);
WebSocketServer.queueActivities('conv-234', { id: 'activity-1' } as Activity);

WebSocketServer.queueActivities('conv-123', { id: 'activity-2' } as Activity);
WebSocketServer.queueActivities('conv-234', { id: 'activity-2' } as Activity);
websocketHandler(
{
params: {
conversationId: 'conv-234',
},
},
{
claimUpgrade: jest.fn(() => {
return {
head: jest.fn(),
socket: jest.fn(),
};
}),
}
);
const socketSendMock = jest.fn();
onConnectionFunction({
send: socketSendMock,
on: jest.fn(),
});
expect(socketSendMock).toHaveBeenCalledTimes(2);
expect(socketSendMock).toHaveBeenNthCalledWith(1, JSON.stringify({ activities: [{ id: 'activity-1' }] }));
expect(socketSendMock).toHaveBeenNthCalledWith(2, JSON.stringify({ activities: [{ id: 'activity-2' }] }));
socketSendMock.mockClear();

websocketHandler(
{
params: {
conversationId: 'conv-123',
},
},
{
claimUpgrade: jest.fn(() => {
return {
head: jest.fn(),
socket: jest.fn(),
};
}),
}
);
onConnectionFunction({
send: socketSendMock,
on: jest.fn(),
});
expect(socketSendMock).toHaveBeenCalledTimes(2);
expect(socketSendMock).toHaveBeenNthCalledWith(1, JSON.stringify({ activities: [{ id: 'activity-1' }] }));
expect(socketSendMock).toHaveBeenNthCalledWith(2, JSON.stringify({ activities: [{ id: 'activity-2' }] }));
});
});
33 changes: 33 additions & 0 deletions packages/app/main/src/server/webSocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import { createServer, Next, Request, Response, Server } from 'restify';
import { Server as WSServer } from 'ws';
import { Activity } from 'botframework-schema';

// can't import WebSocket type from ws types :|
interface WebSocket {
Expand All @@ -45,11 +46,40 @@ export class WebSocketServer {
private static _restServer: Server;
private static _servers: { [conversationId: string]: WSServer } = {};
private static _sockets: { [conversationId: string]: WebSocket } = {};
private static queuedMessages: { [conversationId: string]: Activity[] } = {};

private static sendBackedUpMessages(conversationId: string, socket: WebSocket) {
if (this.queuedMessages[conversationId]) {
while (this.queuedMessages[conversationId].length > 0) {
const activity: Activity = this.queuedMessages[conversationId].shift();
const payload = { activities: [activity] };
socket.send(JSON.stringify(payload));
}
}
}

public static getSocketByConversationId(conversationId: string): WebSocket {
return this._sockets[conversationId];
}

public static queueActivities(conversationId: string, activity: Activity): void {
if (!this.queuedMessages[conversationId]) {
this.queuedMessages[conversationId] = [];
}
this.queuedMessages[conversationId].push(activity);
}

public static sendToSubscribers(conversationId: string, activity: Activity): void {
const socket = this._sockets[conversationId];
if (socket) {
const payload = { activities: [activity] };
this.sendBackedUpMessages(conversationId, socket);
socket.send(JSON.stringify(payload));
} else {
this.queueActivities(conversationId, activity);
}
}

/** Initializes the server and returns the port it is listening on, or if already initialized,
* is a no-op.
*/
Expand All @@ -69,10 +99,13 @@ export class WebSocketServer {
noServer: true,
});
wsServer.on('connection', (socket, req) => {
this.sendBackedUpMessages(conversationId, socket);
this._sockets[conversationId] = socket;

socket.on('close', (code, reason) => {
delete this._servers[conversationId];
delete this._sockets[conversationId];
delete this.queuedMessages[conversationId];
});
});
// upgrade the connection to a ws connection
Expand Down

0 comments on commit 03a9777

Please sign in to comment.