diff --git a/packages/stlite-kernel/src/index.ts b/packages/stlite-kernel/src/index.ts index dd0c0aadc..cd8b57797 100644 --- a/packages/stlite-kernel/src/index.ts +++ b/packages/stlite-kernel/src/index.ts @@ -1,5 +1,4 @@ export * from "./kernel"; export * from "./streamlit-replacements/lib/ConnectionManager"; -export * from "./streamlit-replacements/lib/ConnectionState"; export * from "./streamlit-replacements/lib/FileUploadClient"; export * from "./react-helpers"; diff --git a/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts b/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts index 77308e0ef..0d856dda5 100644 --- a/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts +++ b/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionManager.ts @@ -1,14 +1,20 @@ // Mimic https://github.com/streamlit/streamlit/blob/1.9.0/frontend/src/lib/ConnectionManager.ts +// and WebsocketConnection. import { BackMsg, ForwardMsg } from "streamlit-browser/src/autogen/proto" import { BaseUriParts } from "streamlit-browser/src/lib/UriUtil" import { ReactNode } from "react" import { StliteKernel } from "../../kernel" -import { ConnectionState } from "./ConnectionState" +import { ConnectionState } from "streamlit-browser/src/lib/ConnectionState" +import { ForwardMsgCache } from "streamlit-browser/src/lib/ForwardMessageCache" import { ensureError } from "streamlit-browser/src/lib/ErrorHandling" import { DUMMY_BASE_HOSTNAME, DUMMY_BASE_PORT } from "../../consts" +interface MessageQueue { + [index: number]: any +} + interface Props { /** * Kernel object to connect to. @@ -38,13 +44,14 @@ export class ConnectionManager { constructor(props: Props) { this.props = props + this.cache = new ForwardMsgCache(() => this.getBaseUriParts()) this.props.kernel.onWebSocketMessage((payload) => { if (typeof payload === "string") { console.error("Unexpected payload type.") return } - props.onMessage(ForwardMsg.decode(payload)) + this.handleMessage(payload) }) this.props.kernel.loaded.then(() => { @@ -94,12 +101,60 @@ export class ConnectionManager { } } + /** + * ForwardMessages get passed through this cache. This gets initialized + * once we connect to the server. + */ + private readonly cache: ForwardMsgCache + + /** + * To guarantee packet transmission order, this is the index of the last + * dispatched incoming message. + */ + private lastDispatchedMessageIndex = -1 + + /** + * And this is the index of the next message we receive. + */ + private nextMessageIndex = 0 + + /** + * This dictionary stores received messages that we haven't sent out yet + * (because we're still decoding previous messages) + */ + private readonly messageQueue: MessageQueue = {} + /** * Increment the runCount on our message cache, and clear entries * whose age is greater than the max. */ public incrementMessageCacheRunCount(maxMessageAge: number): void { - // TODO: Implement + this.cache.incrementRunCount(maxMessageAge) + } + + private async handleMessage(data: ArrayBuffer): Promise { + // Assign this message an index. + const messageIndex = this.nextMessageIndex + this.nextMessageIndex += 1 + + const encodedMsg = new Uint8Array(data) + const msg = ForwardMsg.decode(encodedMsg) + + this.messageQueue[messageIndex] = await this.cache.processMessagePayload( + msg, + encodedMsg + ) + + // Dispatch any pending messages in the queue. This may *not* result + // in our just-decoded message being dispatched: if there are other + // messages that were received earlier than this one but are being + // downloaded, our message won't be sent until they're done. + while (this.lastDispatchedMessageIndex + 1 in this.messageQueue) { + const dispatchMessageIndex = this.lastDispatchedMessageIndex + 1 + this.props.onMessage(this.messageQueue[dispatchMessageIndex]) + delete this.messageQueue[dispatchMessageIndex] + this.lastDispatchedMessageIndex = dispatchMessageIndex + } } private async connect(): Promise { diff --git a/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionState.ts b/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionState.ts deleted file mode 100644 index 8ac110f2c..000000000 --- a/packages/stlite-kernel/src/streamlit-replacements/lib/ConnectionState.ts +++ /dev/null @@ -1,24 +0,0 @@ -/** - * @license - * Copyright 2018-2022 Streamlit Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -export enum ConnectionState { - CONNECTED = "CONNECTED", - DISCONNECTED_FOREVER = "DISCONNECTED_FOREVER", - INITIAL = "INITIAL", - PINGING_SERVER = "PINGING_SERVER", - CONNECTING = "CONNECTING", -}