diff --git a/packages/stlite-kernel/src/kernel.ts b/packages/stlite-kernel/src/kernel.ts index 6038943f9..0fbd90a58 100644 --- a/packages/stlite-kernel/src/kernel.ts +++ b/packages/stlite-kernel/src/kernel.ts @@ -138,14 +138,12 @@ export class StliteKernel { } public connectWebSocket(path: string): Promise { - this._worker.postMessage({ + return this._asyncPostMessage({ type: "websocket:connect", data: { path, }, }); - - return Promise.resolve(); // TODO: Communicate the worker to confirm the connection } public sendWebSocketMessage(payload: Uint8Array) { diff --git a/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts b/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts index 0d856dda5..a349c3114 100644 --- a/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts +++ b/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts @@ -158,7 +158,7 @@ export class ConnectionManager { } private async connect(): Promise { - const WEBSOCKET_STREAM_PATH = "stream" // The original is defined in streamlit/frontend/src/lib/WebsocketConnection.tsx + const WEBSOCKET_STREAM_PATH = "/stream" // The original is defined in streamlit/frontend/src/lib/WebsocketConnection.tsx try { await this.props.kernel.connectWebSocket(WEBSOCKET_STREAM_PATH) diff --git a/packages/stlite-kernel/src/types.d.ts b/packages/stlite-kernel/src/types.d.ts index 4af4d2236..83cdef376 100644 --- a/packages/stlite-kernel/src/types.d.ts +++ b/packages/stlite-kernel/src/types.d.ts @@ -135,7 +135,7 @@ type OutMessage = | WebSocketBackMessage; /** - * Reply message + * Reply message to InMessage */ interface ReplyMessageBase { type: string; diff --git a/packages/stlite-kernel/src/worker.ts b/packages/stlite-kernel/src/worker.ts index f60f136e3..574ec5e92 100644 --- a/packages/stlite-kernel/src/worker.ts +++ b/packages/stlite-kernel/src/worker.ts @@ -297,35 +297,49 @@ self.onmessage = async (event: MessageEvent): Promise => { case "websocket:connect": { console.debug("websocket:connect", data.data); - httpServer.start_websocket( - "/stream", - (messageProxy: any, binary: boolean) => { - // XXX: Now there is no session mechanism - - if (binary) { - const buffer = messageProxy.getBuffer("u8"); - messageProxy.destroy(); - const payload = new Uint8ClampedArray( - buffer.data.buffer, - buffer.data.byteOffset, - buffer.data.byteLength - ); - ctx.postMessage({ - type: "websocket:message", - data: { - payload: new Uint8Array(payload), - }, - }); - } else { - ctx.postMessage({ - type: "websocket:message", - data: { - payload: messageProxy, - }, - }); + const messagePort = event.ports[0]; + const { path } = data.data; + + try { + httpServer.start_websocket( + path, + (messageProxy: any, binary: boolean) => { + // XXX: Now there is no session mechanism + + if (binary) { + const buffer = messageProxy.getBuffer("u8"); + messageProxy.destroy(); + const payload = new Uint8ClampedArray( + buffer.data.buffer, + buffer.data.byteOffset, + buffer.data.byteLength + ); + ctx.postMessage({ + type: "websocket:message", + data: { + payload: new Uint8Array(payload), + }, + }); + } else { + ctx.postMessage({ + type: "websocket:message", + data: { + payload: messageProxy, + }, + }); + } } - } - ); + ); + + messagePort.postMessage({ + type: "reply", + }); + } catch (error) { + messagePort.postMessage({ + type: "reply", + error, + }); + } break; } case "websocket:send": {