From 4e43e8658b079ff0ead3c26976900ed03d23b66e Mon Sep 17 00:00:00 2001 From: Martin Turoci Date: Mon, 5 Feb 2024 10:36:05 +0100 Subject: [PATCH] feat: Allow reconnects without losing state/focus. --- broker.go | 4 ++++ client.go | 38 +++++++++++++++++++++++++++----------- protocol.go | 1 + socket.go | 22 +++++++++++++++++++++- ui/src/core.ts | 31 ++++++++++++++++++++++++++----- 5 files changed, 79 insertions(+), 17 deletions(-) diff --git a/broker.go b/broker.go index 1586010038..61bfdd336f 100644 --- a/broker.go +++ b/broker.go @@ -77,6 +77,7 @@ type Broker struct { unicasts map[string]bool // "/client_id" => true unicastsMux sync.RWMutex // mutex for tracking unicast routes keepAppLive bool + clientsByID map[string]*Client } func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *Broker { @@ -96,6 +97,7 @@ func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *B make(map[string]bool), sync.RWMutex{}, keepAppLive, + make(map[string]*Client), } } @@ -263,6 +265,7 @@ func (b *Broker) addClient(route string, client *Client) { b.unicastsMux.Lock() b.unicasts["/"+client.id] = true + b.clientsByID[client.id] = client b.unicastsMux.Unlock() echo(Log{"t": "ui_add", "addr": client.addr, "route": route}) @@ -291,6 +294,7 @@ func (b *Broker) dropClient(client *Client) { b.unicastsMux.Lock() delete(b.unicasts, "/"+client.id) + delete(b.clientsByID, client.id) b.unicastsMux.Unlock() echo(Log{"t": "ui_drop", "addr": client.addr}) diff --git a/client.go b/client.go index 02e052ab63..8d5269d2bb 100644 --- a/client.go +++ b/client.go @@ -67,10 +67,13 @@ type Client struct { header *http.Header // forwarded headers from the WS connection appPath string // path of the app this client is connected to, doesn't change throughout WS lifetime pingInterval time.Duration + isReconnect bool + cancel context.CancelFunc } -func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, baseURL string, header *http.Header, pingInterval time.Duration) *Client { - return &Client{uuid.New().String(), auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval} +func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, baseURL string, header *http.Header, pingInterval time.Duration, isReconnect bool) *Client { + id := uuid.New().String() + return &Client{id, auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval, isReconnect, nil} } func (c *Client) refreshToken() error { @@ -90,15 +93,26 @@ func (c *Client) refreshToken() error { func (c *Client) listen() { defer func() { - app := c.broker.getApp(c.appPath) - if app != nil { - app.forward(c.id, c.session, disconnectMsg) - if err := app.disconnect(c.id); err != nil { - echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()}) + ctx, cancel := context.WithCancel(context.Background()) + c.cancel = cancel + go func(ctx context.Context) { + select { + // Send disconnect message only if client doesn't reconnect within 2s. + case <-time.After(2 * time.Second): + app := c.broker.getApp(c.appPath) + if app != nil { + app.forward(c.id, c.session, disconnectMsg) + if err := app.disconnect(c.id); err != nil { + echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()}) + } + } + + c.broker.unsubscribe <- c + case <-ctx.Done(): + return } - } + }(ctx) - c.broker.unsubscribe <- c c.conn.Close() }() // Time allowed to read the next pong message from the peer. Must be greater than ping interval. @@ -157,8 +171,10 @@ func (c *Client) listen() { c.broker.sendAll(c.broker.clients[app.route], clearStateMsg) } case watchMsgT: - c.subscribe(m.addr) // subscribe even if page is currently NA - + if c.isReconnect { + continue + } + c.subscribe(m.addr) // subscribe even if page is currently NA if app := c.broker.getApp(m.addr); app != nil { // do we have an app handling this route? c.appPath = m.addr switch app.mode { diff --git a/protocol.go b/protocol.go index 2bfe537def..6efc6d3181 100644 --- a/protocol.go +++ b/protocol.go @@ -23,6 +23,7 @@ type OpsD struct { E string `json:"e,omitempty"` // error M *Meta `json:"m,omitempty"` // metadata C int `json:"c,omitempty"` // clear UI state + I string `json:"i,omitempty"` // client id } // Meta represents metadata unrelated to commands diff --git a/socket.go b/socket.go index 5b9e755dd0..bc5b9cd56f 100644 --- a/socket.go +++ b/socket.go @@ -70,8 +70,28 @@ func (s *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } + clientID := r.URL.Query().Get("client-id") + client, ok := s.broker.clientsByID[clientID] + if ok { + client.conn = conn + client.isReconnect = true + if client.cancel != nil { + client.cancel() + } + } else { + client = newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval, false) + } + + if msg, err := json.Marshal(OpsD{I: client.id}); err == nil { + sw, err := conn.NextWriter(websocket.TextMessage) + if err != nil { + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + sw.Write(msg) + sw.Close() + } - client := newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval) go client.flush() go client.listen() } diff --git a/ui/src/core.ts b/ui/src/core.ts index d6460a91d2..c07883f46a 100644 --- a/ui/src/core.ts +++ b/ui/src/core.ts @@ -195,6 +195,7 @@ interface OpsD { e: B // can the user edit pages? } c?: U // clear UI state + i?: S // client id } interface OpD { k?: S @@ -938,14 +939,21 @@ export const let _socket: WebSocket | null = null, _page: XPage | null = null, - _backoff = 1 + _backoff = 1, + _reconnectFailures = 0, + _clientID = '' const slug = window.location.pathname, reconnect = (address: S) => { + if (_clientID && !address.includes('?client-id')) { + address = `${address}?${new URLSearchParams({ 'client-id': _clientID })}` + } + const retry = () => reconnect(address) const socket = new WebSocket(address) socket.onopen = () => { + _reconnectFailures = 0 _socket = socket handle(connectEvent) _backoff = 1 @@ -954,11 +962,17 @@ export const } socket.onclose = () => { const refreshRate = refreshRateB() - if (refreshRate === 0) return - // TODO handle refreshRate > 0 case + if (refreshRate === 0) return _socket = null + + // If on unstable network, retry immediately if we haven't failed before. + if (!_reconnectFailures) { + retry() + return + } + _page = null _backoff *= 2 if (_backoff > 16) _backoff = 16 @@ -994,6 +1008,8 @@ export const } else if (msg.m) { const { u: username, e: editable } = msg.m handle({ t: WaveEventType.Config, username, editable }) + } else if (msg.i) { + _clientID = msg.i } } catch (error) { console.error(error) @@ -1003,10 +1019,15 @@ export const } socket.onerror = () => { handle(dataEvent) + _reconnectFailures++ } }, - push = (data: any) => { - if (!_socket) return + push = (data: unknown) => { + if (!_socket) { + // Maybe currently reconnecting. Try again in 500ms. + if (!_reconnectFailures) setTimeout(() => push(data), 500) + return + } _socket.send(`@ ${slug} ${JSON.stringify(data || {})}`) }, fork = (): ChangeSet => {