Skip to content

Commit

Permalink
feature/message cache (#290)
Browse files Browse the repository at this point in the history
* Implement message cache to handle refHash message

* Remove ConnectionState.ts from stlite-kernel and import it from the upstream
  • Loading branch information
whitphx committed Sep 26, 2022
1 parent 520d04d commit a1d2ce4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 28 deletions.
1 change: 0 additions & 1 deletion packages/stlite-kernel/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from "./kernel";
export * from "./streamlit-replacements/lib/ConnectionManager";
export * from "./streamlit-replacements/lib/ConnectionState";
export * from "./streamlit-replacements/lib/FileUploadClient";
export * from "./react-helpers";
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Mimic https://github.com/streamlit/streamlit/blob/1.9.0/frontend/src/lib/ConnectionManager.ts
// and WebsocketConnection.

import { BackMsg, ForwardMsg } from "streamlit-browser/src/autogen/proto"
import { BaseUriParts } from "streamlit-browser/src/lib/UriUtil"
import { ReactNode } from "react"

import { StliteKernel } from "../../kernel"
import { ConnectionState } from "./ConnectionState"
import { ConnectionState } from "streamlit-browser/src/lib/ConnectionState"
import { ForwardMsgCache } from "streamlit-browser/src/lib/ForwardMessageCache"
import { ensureError } from "streamlit-browser/src/lib/ErrorHandling"
import { DUMMY_BASE_HOSTNAME, DUMMY_BASE_PORT } from "../../consts"

interface MessageQueue {
[index: number]: any
}

interface Props {
/**
* Kernel object to connect to.
Expand Down Expand Up @@ -38,13 +44,14 @@ export class ConnectionManager {

constructor(props: Props) {
this.props = props
this.cache = new ForwardMsgCache(() => this.getBaseUriParts())

this.props.kernel.onWebSocketMessage((payload) => {
if (typeof payload === "string") {
console.error("Unexpected payload type.")
return
}
props.onMessage(ForwardMsg.decode(payload))
this.handleMessage(payload)
})

this.props.kernel.loaded.then(() => {
Expand Down Expand Up @@ -94,12 +101,60 @@ export class ConnectionManager {
}
}

/**
* ForwardMessages get passed through this cache. This gets initialized
* once we connect to the server.
*/
private readonly cache: ForwardMsgCache

/**
* To guarantee packet transmission order, this is the index of the last
* dispatched incoming message.
*/
private lastDispatchedMessageIndex = -1

/**
* And this is the index of the next message we receive.
*/
private nextMessageIndex = 0

/**
* This dictionary stores received messages that we haven't sent out yet
* (because we're still decoding previous messages)
*/
private readonly messageQueue: MessageQueue = {}

/**
* Increment the runCount on our message cache, and clear entries
* whose age is greater than the max.
*/
public incrementMessageCacheRunCount(maxMessageAge: number): void {
// TODO: Implement
this.cache.incrementRunCount(maxMessageAge)
}

private async handleMessage(data: ArrayBuffer): Promise<void> {
// Assign this message an index.
const messageIndex = this.nextMessageIndex
this.nextMessageIndex += 1

const encodedMsg = new Uint8Array(data)
const msg = ForwardMsg.decode(encodedMsg)

this.messageQueue[messageIndex] = await this.cache.processMessagePayload(
msg,
encodedMsg
)

// Dispatch any pending messages in the queue. This may *not* result
// in our just-decoded message being dispatched: if there are other
// messages that were received earlier than this one but are being
// downloaded, our message won't be sent until they're done.
while (this.lastDispatchedMessageIndex + 1 in this.messageQueue) {
const dispatchMessageIndex = this.lastDispatchedMessageIndex + 1
this.props.onMessage(this.messageQueue[dispatchMessageIndex])
delete this.messageQueue[dispatchMessageIndex]
this.lastDispatchedMessageIndex = dispatchMessageIndex
}
}

private async connect(): Promise<void> {
Expand Down

This file was deleted.

0 comments on commit a1d2ce4

Please sign in to comment.