Skip to content

Commit

Permalink
feat: Allow reconnects without losing state/focus.
Browse files Browse the repository at this point in the history
  • Loading branch information
mturoci committed Feb 7, 2024
1 parent ca27772 commit 4e43e86
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 17 deletions.
4 changes: 4 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Broker struct {
unicasts map[string]bool // "/client_id" => true
unicastsMux sync.RWMutex // mutex for tracking unicast routes
keepAppLive bool
clientsByID map[string]*Client
}

func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *Broker {
Expand All @@ -96,6 +97,7 @@ func newBroker(site *Site, editable, noStore, noLog, keepAppLive, debug bool) *B
make(map[string]bool),
sync.RWMutex{},
keepAppLive,
make(map[string]*Client),
}
}

Expand Down Expand Up @@ -263,6 +265,7 @@ func (b *Broker) addClient(route string, client *Client) {

b.unicastsMux.Lock()
b.unicasts["/"+client.id] = true
b.clientsByID[client.id] = client
b.unicastsMux.Unlock()

echo(Log{"t": "ui_add", "addr": client.addr, "route": route})
Expand Down Expand Up @@ -291,6 +294,7 @@ func (b *Broker) dropClient(client *Client) {

b.unicastsMux.Lock()
delete(b.unicasts, "/"+client.id)
delete(b.clientsByID, client.id)
b.unicastsMux.Unlock()

echo(Log{"t": "ui_drop", "addr": client.addr})
Expand Down
38 changes: 27 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,13 @@ type Client struct {
header *http.Header // forwarded headers from the WS connection
appPath string // path of the app this client is connected to, doesn't change throughout WS lifetime
pingInterval time.Duration
isReconnect bool
cancel context.CancelFunc
}

func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, baseURL string, header *http.Header, pingInterval time.Duration) *Client {
return &Client{uuid.New().String(), auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval}
func newClient(addr string, auth *Auth, session *Session, broker *Broker, conn *websocket.Conn, editable bool, baseURL string, header *http.Header, pingInterval time.Duration, isReconnect bool) *Client {
id := uuid.New().String()
return &Client{id, auth, addr, session, broker, conn, nil, make(chan []byte, 256), editable, baseURL, header, "", pingInterval, isReconnect, nil}
}

func (c *Client) refreshToken() error {
Expand All @@ -90,15 +93,26 @@ func (c *Client) refreshToken() error {

func (c *Client) listen() {
defer func() {
app := c.broker.getApp(c.appPath)
if app != nil {
app.forward(c.id, c.session, disconnectMsg)
if err := app.disconnect(c.id); err != nil {
echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()})
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
go func(ctx context.Context) {
select {
// Send disconnect message only if client doesn't reconnect within 2s.
case <-time.After(2 * time.Second):
app := c.broker.getApp(c.appPath)
if app != nil {
app.forward(c.id, c.session, disconnectMsg)
if err := app.disconnect(c.id); err != nil {
echo(Log{"t": "disconnect", "client": c.addr, "route": c.appPath, "err": err.Error()})
}
}

c.broker.unsubscribe <- c
case <-ctx.Done():
return
}
}
}(ctx)

c.broker.unsubscribe <- c
c.conn.Close()
}()
// Time allowed to read the next pong message from the peer. Must be greater than ping interval.
Expand Down Expand Up @@ -157,8 +171,10 @@ func (c *Client) listen() {
c.broker.sendAll(c.broker.clients[app.route], clearStateMsg)
}
case watchMsgT:
c.subscribe(m.addr) // subscribe even if page is currently NA

if c.isReconnect {
continue
}
c.subscribe(m.addr) // subscribe even if page is currently NA
if app := c.broker.getApp(m.addr); app != nil { // do we have an app handling this route?
c.appPath = m.addr
switch app.mode {
Expand Down
1 change: 1 addition & 0 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type OpsD struct {
E string `json:"e,omitempty"` // error
M *Meta `json:"m,omitempty"` // metadata
C int `json:"c,omitempty"` // clear UI state
I string `json:"i,omitempty"` // client id
}

// Meta represents metadata unrelated to commands
Expand Down
22 changes: 21 additions & 1 deletion socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,28 @@ func (s *SocketServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
}
clientID := r.URL.Query().Get("client-id")
client, ok := s.broker.clientsByID[clientID]
if ok {
client.conn = conn
client.isReconnect = true
if client.cancel != nil {
client.cancel()
}
} else {
client = newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval, false)
}

if msg, err := json.Marshal(OpsD{I: client.id}); err == nil {
sw, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
sw.Write(msg)
sw.Close()
}

client := newClient(getRemoteAddr(r), s.auth, session, s.broker, conn, s.editable, s.baseURL, &header, s.pingInterval)
go client.flush()
go client.listen()
}
Expand Down
31 changes: 26 additions & 5 deletions ui/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ interface OpsD {
e: B // can the user edit pages?
}
c?: U // clear UI state
i?: S // client id
}
interface OpD {
k?: S
Expand Down Expand Up @@ -938,14 +939,21 @@ export const
let
_socket: WebSocket | null = null,
_page: XPage | null = null,
_backoff = 1
_backoff = 1,
_reconnectFailures = 0,
_clientID = ''

const
slug = window.location.pathname,
reconnect = (address: S) => {
if (_clientID && !address.includes('?client-id')) {
address = `${address}?${new URLSearchParams({ 'client-id': _clientID })}`
}

const retry = () => reconnect(address)
const socket = new WebSocket(address)
socket.onopen = () => {
_reconnectFailures = 0
_socket = socket
handle(connectEvent)
_backoff = 1
Expand All @@ -954,11 +962,17 @@ export const
}
socket.onclose = () => {
const refreshRate = refreshRateB()
if (refreshRate === 0) return

// TODO handle refreshRate > 0 case
if (refreshRate === 0) return

_socket = null

// If on unstable network, retry immediately if we haven't failed before.
if (!_reconnectFailures) {
retry()
return
}

_page = null
_backoff *= 2
if (_backoff > 16) _backoff = 16
Expand Down Expand Up @@ -994,6 +1008,8 @@ export const
} else if (msg.m) {
const { u: username, e: editable } = msg.m
handle({ t: WaveEventType.Config, username, editable })
} else if (msg.i) {
_clientID = msg.i
}
} catch (error) {
console.error(error)
Expand All @@ -1003,10 +1019,15 @@ export const
}
socket.onerror = () => {
handle(dataEvent)
_reconnectFailures++
}
},
push = (data: any) => {
if (!_socket) return
push = (data: unknown) => {
if (!_socket) {
// Maybe currently reconnecting. Try again in 500ms.
if (!_reconnectFailures) setTimeout(() => push(data), 500)
return
}
_socket.send(`@ ${slug} ${JSON.stringify(data || {})}`)
},
fork = (): ChangeSet => {
Expand Down

0 comments on commit 4e43e86

Please sign in to comment.