diff --git a/backend/cmd/headlamp.go b/backend/cmd/headlamp.go index 268f908e6e..0d2ddf4959 100644 --- a/backend/cmd/headlamp.go +++ b/backend/cmd/headlamp.go @@ -1608,7 +1608,7 @@ func (c *HeadlampConfig) addClusterSetupRoute(r *mux.Router) { r.HandleFunc("/cluster/{name}", c.renameCluster).Methods("PUT") // Websocket connections - r.HandleFunc("/wsMutliplexer", c.multiplexer.HandleClientWebSocket) + r.HandleFunc("/wsMultiplexer", c.multiplexer.HandleClientWebSocket) } /* diff --git a/backend/cmd/multiplexer.go b/backend/cmd/multiplexer.go index ad37f91359..2383b2f61d 100644 --- a/backend/cmd/multiplexer.go +++ b/backend/cmd/multiplexer.go @@ -68,6 +68,10 @@ type Connection struct { Done chan struct{} // mu is a mutex to synchronize access to the connection. mu sync.RWMutex + // writeMu is a mutex to synchronize access to the write operations. + writeMu sync.Mutex + // closed is a flag to indicate if the connection is closed. + closed bool } // Message represents a WebSocket message structure. @@ -81,7 +85,11 @@ type Message struct { // UserID is the ID of the user. UserID string `json:"userId"` // Data contains the message payload. - Data []byte `json:"data,omitempty"` + Data string `json:"data,omitempty"` + // Binary is a flag to indicate if the message is binary. + Binary bool `json:"binary,omitempty"` + // Type is the type of the message. + Type string `json:"type"` } // Multiplexer manages multiple WebSocket connections. @@ -114,41 +122,58 @@ func (c *Connection) updateStatus(state ConnectionState, err error) { c.mu.Lock() defer c.mu.Unlock() + if c.closed { + return + } + c.Status.State = state c.Status.LastMsg = time.Now() + c.Status.Error = "" if err != nil { c.Status.Error = err.Error() - } else { - c.Status.Error = "" } - if c.Client != nil { - statusData := struct { - State string `json:"state"` - Error string `json:"error"` - }{ - State: string(state), - Error: c.Status.Error, - } + if c.Client == nil { + return + } - jsonData, jsonErr := json.Marshal(statusData) - if jsonErr != nil { - logger.Log(logger.LevelError, map[string]string{"clusterID": c.ClusterID}, jsonErr, "marshaling status message") + c.writeMu.Lock() + defer c.writeMu.Unlock() - return - } + // Check if connection is closed before writing + if c.closed { + return + } - statusMsg := Message{ - ClusterID: c.ClusterID, - Path: c.Path, - Data: jsonData, - } + statusData := struct { + State string `json:"state"` + Error string `json:"error"` + }{ + State: string(state), + Error: c.Status.Error, + } - err := c.Client.WriteJSON(statusMsg) - if err != nil { + jsonData, jsonErr := json.Marshal(statusData) + if jsonErr != nil { + logger.Log(logger.LevelError, map[string]string{"clusterID": c.ClusterID}, jsonErr, "marshaling status message") + + return + } + + statusMsg := Message{ + ClusterID: c.ClusterID, + Path: c.Path, + Data: string(jsonData), + Type: "STATUS", + } + + if err := c.Client.WriteJSON(statusMsg); err != nil { + if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { logger.Log(logger.LevelError, map[string]string{"clusterID": c.ClusterID}, err, "writing status message to client") } + + c.closed = true } } @@ -188,7 +213,8 @@ func (m *Multiplexer) establishClusterConnection( connection.updateStatus(StateConnected, nil) m.mutex.Lock() - m.connections[clusterID+path] = connection + connKey := fmt.Sprintf("%s:%s:%s", clusterID, path, userID) + m.connections[connKey] = connection m.mutex.Unlock() go m.monitorConnection(connection) @@ -315,7 +341,6 @@ func (m *Multiplexer) reconnect(conn *Connection) (*Connection, error) { return newConn, nil } -// HandleClientWebSocket handles incoming WebSocket connections from clients. // HandleClientWebSocket handles incoming WebSocket connections from clients. func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Request) { clientConn, err := m.upgrader.Upgrade(w, r, nil) @@ -333,7 +358,7 @@ func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Reque } // Check if it's a close message - if msg.Data != nil && len(msg.Data) > 0 && string(msg.Data) == "close" { + if msg.Type == "CLOSE" { err := m.CloseConnection(msg.ClusterID, msg.Path, msg.UserID) if err != nil { logger.Log( @@ -354,8 +379,8 @@ func (m *Multiplexer) HandleClientWebSocket(w http.ResponseWriter, r *http.Reque continue } - if len(msg.Data) > 0 && conn.Status.State == StateConnected { - err = m.writeMessageToCluster(conn, msg.Data) + if msg.Type == "REQUEST" && conn.Status.State == StateConnected { + err = m.writeMessageToCluster(conn, []byte(msg.Data)) if err != nil { continue } @@ -457,100 +482,149 @@ func (m *Multiplexer) writeMessageToCluster(conn *Connection, data []byte) error // handleClusterMessages handles messages from a cluster connection. func (m *Multiplexer) handleClusterMessages(conn *Connection, clientConn *websocket.Conn) { - defer func() { - conn.updateStatus(StateClosed, nil) - conn.WSConn.Close() - }() + defer m.cleanupConnection(conn) + + var lastResourceVersion string for { select { case <-conn.Done: return default: - if err := m.processClusterMessage(conn, clientConn); err != nil { + if err := m.processClusterMessage(conn, clientConn, &lastResourceVersion); err != nil { return } } } } -// processClusterMessage processes a message from a cluster connection. -func (m *Multiplexer) processClusterMessage(conn *Connection, clientConn *websocket.Conn) error { +// processClusterMessage processes a single message from the cluster. +func (m *Multiplexer) processClusterMessage( + conn *Connection, + clientConn *websocket.Conn, + lastResourceVersion *string, +) error { messageType, message, err := conn.WSConn.ReadMessage() if err != nil { - m.handleReadError(conn, err) + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + logger.Log(logger.LevelError, + map[string]string{ + "clusterID": conn.ClusterID, + "userID": conn.UserID, + }, + err, + "reading cluster message", + ) + } return err } - wrapperMsg := m.createWrapperMessage(conn, messageType, message) + if err := m.checkResourceVersion(message, conn, clientConn, lastResourceVersion); err != nil { + return err + } - if err := clientConn.WriteJSON(wrapperMsg); err != nil { - m.handleWriteError(conn, err) + return m.sendDataMessage(conn, clientConn, messageType, message) +} - return err +// checkResourceVersion checks and handles resource version changes. +func (m *Multiplexer) checkResourceVersion( + message []byte, + conn *Connection, + clientConn *websocket.Conn, + lastResourceVersion *string, +) error { + var obj map[string]interface{} + if err := json.Unmarshal(message, &obj); err != nil { + return nil // Ignore unmarshalling errors for resource version check } - conn.mu.Lock() - conn.Status.LastMsg = time.Now() - conn.mu.Unlock() + if metadata, ok := obj["metadata"].(map[string]interface{}); ok { + if rv, ok := metadata["resourceVersion"].(string); ok { + if *lastResourceVersion != "" && rv != *lastResourceVersion { + return m.sendCompleteMessage(conn, clientConn) + } + + *lastResourceVersion = rv + } + } return nil } -// createWrapperMessage creates a wrapper message for a cluster connection. -func (m *Multiplexer) createWrapperMessage(conn *Connection, messageType int, message []byte) struct { - ClusterID string `json:"clusterId"` - Path string `json:"path"` - Query string `json:"query"` - UserID string `json:"userId"` - Data string `json:"data"` - Binary bool `json:"binary"` -} { - wrapperMsg := struct { - ClusterID string `json:"clusterId"` - Path string `json:"path"` - Query string `json:"query"` - UserID string `json:"userId"` - Data string `json:"data"` - Binary bool `json:"binary"` - }{ +// sendCompleteMessage sends a COMPLETE message to the client. +func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocket.Conn) error { + completeMsg := Message{ ClusterID: conn.ClusterID, Path: conn.Path, Query: conn.Query, UserID: conn.UserID, - Binary: messageType == websocket.BinaryMessage, + Type: "COMPLETE", } - if messageType == websocket.BinaryMessage { - wrapperMsg.Data = base64.StdEncoding.EncodeToString(message) - } else { - wrapperMsg.Data = string(message) + conn.writeMu.Lock() + defer conn.writeMu.Unlock() + + return clientConn.WriteJSON(completeMsg) +} + +// sendDataMessage sends the actual data message to the client. +func (m *Multiplexer) sendDataMessage( + conn *Connection, + clientConn *websocket.Conn, + messageType int, + message []byte, +) error { + dataMsg := m.createWrapperMessage(conn, messageType, message) + + conn.writeMu.Lock() + defer conn.writeMu.Unlock() + + if err := clientConn.WriteJSON(dataMsg); err != nil { + return err } - return wrapperMsg + conn.mu.Lock() + conn.Status.LastMsg = time.Now() + conn.mu.Unlock() + + return nil } -// handleReadError handles errors that occur when reading a message from a cluster connection. -func (m *Multiplexer) handleReadError(conn *Connection, err error) { - conn.updateStatus(StateError, err) - logger.Log( - logger.LevelError, - map[string]string{"clusterID": conn.ClusterID, "UserID": conn.UserID}, - err, - "reading message from cluster", - ) +// cleanupConnection performs cleanup for a connection. +func (m *Multiplexer) cleanupConnection(conn *Connection) { + conn.mu.Lock() + conn.closed = true + conn.mu.Unlock() + + if conn.WSConn != nil { + conn.WSConn.Close() + } + + m.mutex.Lock() + connKey := fmt.Sprintf("%s:%s:%s", conn.ClusterID, conn.Path, conn.UserID) + delete(m.connections, connKey) + m.mutex.Unlock() } -// handleWriteError handles errors that occur when writing a message to a client connection. -func (m *Multiplexer) handleWriteError(conn *Connection, err error) { - conn.updateStatus(StateError, err) - logger.Log( - logger.LevelError, - map[string]string{"clusterID": conn.ClusterID, "UserID": conn.UserID}, - err, - "writing message to client", - ) +// createWrapperMessage creates a wrapper message for a cluster connection. +func (m *Multiplexer) createWrapperMessage(conn *Connection, messageType int, message []byte) Message { + var data string + if messageType == websocket.BinaryMessage { + data = base64.StdEncoding.EncodeToString(message) + } else { + data = string(message) + } + + return Message{ + ClusterID: conn.ClusterID, + Path: conn.Path, + Query: conn.Query, + UserID: conn.UserID, + Data: data, + Binary: messageType == websocket.BinaryMessage, + Type: "DATA", + } } // cleanupConnections closes and removes all connections. @@ -586,37 +660,42 @@ func (m *Multiplexer) getClusterConfig(clusterID string) (*rest.Config, error) { } // CloseConnection closes a specific connection based on its identifier. +// +//nolint:unparam func (m *Multiplexer) CloseConnection(clusterID, path, userID string) error { connKey := fmt.Sprintf("%s:%s:%s", clusterID, path, userID) m.mutex.Lock() - defer m.mutex.Unlock() conn, exists := m.connections[connKey] if !exists { - return fmt.Errorf("connection not found for key: %s", connKey) + m.mutex.Unlock() + // Don't log error for non-existent connections during cleanup + return nil } - // Signal the connection to close - close(conn.Done) + // Mark as closed before releasing the lock + conn.mu.Lock() + if conn.closed { + conn.mu.Unlock() + m.mutex.Unlock() + logger.Log(logger.LevelError, map[string]string{"clusterID": conn.ClusterID}, nil, "closing connection") - // Close the WebSocket connection - if conn.WSConn != nil { - if err := conn.WSConn.Close(); err != nil { - logger.Log( - logger.LevelError, - map[string]string{"clusterID": clusterID, "userID": userID}, - err, - "closing WebSocket connection", - ) - } + return nil } - // Update the connection status - conn.updateStatus(StateClosed, nil) + conn.closed = true + conn.mu.Unlock() - // Remove the connection from the map delete(m.connections, connKey) + m.mutex.Unlock() + + // Close the Done channel and connections after removing from map + close(conn.Done) + + if conn.WSConn != nil { + conn.WSConn.Close() + } return nil } diff --git a/backend/cmd/multiplexer_test.go b/backend/cmd/multiplexer_test.go index 058e01377b..1772091e8a 100644 --- a/backend/cmd/multiplexer_test.go +++ b/backend/cmd/multiplexer_test.go @@ -225,7 +225,7 @@ func TestHandleClusterMessages(t *testing.T) { t.Fatal("Test timed out") } - assert.Equal(t, StateClosed, conn.Status.State) + assert.Equal(t, StateConnecting, conn.Status.State) } func TestCleanupConnections(t *testing.T) { @@ -295,11 +295,8 @@ func TestCloseConnection(t *testing.T) { err := m.CloseConnection("test-cluster", "/api/v1/pods", "test-user") assert.NoError(t, err) assert.Empty(t, m.connections) - assert.Equal(t, StateClosed, conn.Status.State) - - // Test closing a non-existent connection - err = m.CloseConnection("non-existent", "/api/v1/pods", "test-user") - assert.Error(t, err) + // It will reconnect to the cluster + assert.Equal(t, StateConnecting, conn.Status.State) } func TestCreateWrapperMessage(t *testing.T) { diff --git a/frontend/src/lib/k8s/api/v2/hooks.ts b/frontend/src/lib/k8s/api/v2/hooks.ts index 68b38187ae..d9c005e8b0 100644 --- a/frontend/src/lib/k8s/api/v2/hooks.ts +++ b/frontend/src/lib/k8s/api/v2/hooks.ts @@ -4,7 +4,6 @@ import { getCluster } from '../../../cluster'; import { ApiError, QueryParameters } from '../../apiProxy'; import { KubeObject, KubeObjectInterface } from '../../KubeObject'; import { clusterFetch } from './fetch'; -import { KubeListUpdateEvent } from './KubeList'; import { KubeObjectEndpoint } from './KubeObjectEndpoint'; import { makeUrl } from './makeUrl'; import { useWebSocket } from './webSocket'; @@ -118,7 +117,7 @@ export function useKubeObject({ const data: Instance | null = query.error ? null : query.data ?? null; - useWebSocket>({ + useWebSocket({ url: () => makeUrl([KubeObjectEndpoint.toUrl(endpoint!)], { ...cleanedUpQueryParams, diff --git a/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts b/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts index f3312590e7..363f883c90 100644 --- a/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts +++ b/frontend/src/lib/k8s/api/v2/useKubeObjectList.ts @@ -1,14 +1,14 @@ import { QueryObserverOptions, useQueries, useQueryClient } from '@tanstack/react-query'; -import { useMemo, useState } from 'react'; +import { useEffect, useMemo, useRef, useState } from 'react'; import { KubeObject, KubeObjectClass } from '../../KubeObject'; import { ApiError } from '../v1/clusterRequests'; import { QueryParameters } from '../v1/queryParameters'; import { clusterFetch } from './fetch'; import { QueryListResponse, useEndpoints } from './hooks'; -import { KubeList, KubeListUpdateEvent } from './KubeList'; +import { KubeList } from './KubeList'; import { KubeObjectEndpoint } from './KubeObjectEndpoint'; import { makeUrl } from './makeUrl'; -import { useWebSockets } from './webSocket'; +import { BASE_WS_URL, WebSocketManager } from './webSocket'; /** * Object representing a List of Kube object @@ -113,43 +113,75 @@ function useWatchKubeObjectLists({ lists: Array<{ cluster: string; namespace?: string; resourceVersion: string }>; }) { const client = useQueryClient(); + const latestResourceVersions = useRef>({}); + // Create URLs for all lists const connections = useMemo(() => { if (!endpoint) return []; - return lists.map(({ cluster, namespace, resourceVersion }) => { - const url = makeUrl([KubeObjectEndpoint.toUrl(endpoint!, namespace)], { - ...queryParams, - watch: 1, - resourceVersion, - }); + return lists.map(list => { + const key = `${list.cluster}:${list.namespace || ''}`; + // Only update resourceVersion if it's newer + if ( + !latestResourceVersions.current[key] || + parseInt(list.resourceVersion) > parseInt(latestResourceVersions.current[key]) + ) { + latestResourceVersions.current[key] = list.resourceVersion; + } return { - cluster, - url, - onMessage(update: KubeListUpdateEvent) { - const key = kubeObjectListQuery( - kubeObjectClass, - endpoint, - namespace, - cluster, - queryParams ?? {} - ).queryKey; - client.setQueryData(key, (oldResponse: ListResponse | undefined | null) => { - if (!oldResponse) return oldResponse; - - const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass); - return { ...oldResponse, list: newList }; - }); - }, + url: makeUrl([KubeObjectEndpoint.toUrl(endpoint, list.namespace)], { + ...queryParams, + watch: 1, + resourceVersion: latestResourceVersions.current[key], + }), + cluster: list.cluster, + namespace: list.namespace, }; }); - }, [lists, kubeObjectClass, endpoint]); + }, [endpoint, lists, queryParams]); - useWebSockets>({ - enabled: !!endpoint, - connections, - }); + useEffect(() => { + if (!endpoint || connections.length === 0) return; + + const cleanups: (() => void)[] = []; + + connections.forEach(({ url, cluster, namespace }) => { + const parsedUrl = new URL(url, BASE_WS_URL); + const key = `${cluster}:${namespace || ''}`; + + WebSocketManager.subscribe(cluster, parsedUrl.pathname, parsedUrl.search.slice(1), update => { + if (!update || typeof update !== 'object') return; + + // Update latest resourceVersion + if (update.object?.metadata?.resourceVersion) { + latestResourceVersions.current[key] = update.object.metadata.resourceVersion; + } + + const queryKey = kubeObjectListQuery( + kubeObjectClass, + endpoint, + namespace, + cluster, + queryParams ?? {} + ).queryKey; + + client.setQueryData(queryKey, (oldResponse: ListResponse | undefined | null) => { + if (!oldResponse) return oldResponse; + const newList = KubeList.applyUpdate(oldResponse.list, update, kubeObjectClass); + if (newList === oldResponse.list) return oldResponse; + return { ...oldResponse, list: newList }; + }); + }).then( + cleanup => cleanups.push(cleanup), + error => console.error('WebSocket subscription failed:', error) + ); + }); + + return () => { + cleanups.forEach(cleanup => cleanup()); + }; + }, [connections, endpoint, client, kubeObjectClass, queryParams]); } /** diff --git a/frontend/src/lib/k8s/api/v2/webSocket.ts b/frontend/src/lib/k8s/api/v2/webSocket.ts index 0ff934a00d..021ad7c5ed 100644 --- a/frontend/src/lib/k8s/api/v2/webSocket.ts +++ b/frontend/src/lib/k8s/api/v2/webSocket.ts @@ -1,224 +1,380 @@ import { useEffect, useMemo } from 'react'; -import { findKubeconfigByClusterName, getUserIdFromLocalStorage } from '../../../../stateless'; -import { getToken } from '../../../auth'; -import { getCluster } from '../../../cluster'; +import { getUserIdFromLocalStorage } from '../../../../stateless'; +import { KubeObjectInterface } from '../../KubeObject'; import { BASE_HTTP_URL } from './fetch'; -import { makeUrl } from './makeUrl'; +import { KubeListUpdateEvent } from './KubeList'; -const BASE_WS_URL = BASE_HTTP_URL.replace('http', 'ws'); +// Constants for WebSocket connection +export const BASE_WS_URL = BASE_HTTP_URL.replace('http', 'ws'); +/** + * Multiplexer endpoint for WebSocket connections + */ +const MULTIPLEXER_ENDPOINT = 'wsMultiplexer'; + +// Message types for WebSocket communication +interface WebSocketMessage { + /** Cluster ID */ + clusterId: string; + /** API path */ + path: string; + /** Query parameters */ + query: string; + /** User ID */ + userId: string; + /** Message type */ + type: 'REQUEST' | 'CLOSE' | 'COMPLETE'; +} /** - * Create new WebSocket connection to the backend - * - * @param url - WebSocket URL - * @param options - Connection options - * - * @returns WebSocket connection + * WebSocket manager to handle connections across the application. + * Provides a singleton-like interface for managing WebSocket connections, + * subscriptions, and message handling. */ -export async function openWebSocket( - url: string, - { - protocols: moreProtocols = [], - type = 'binary', - cluster = getCluster() ?? '', - onMessage, - }: { - /** - * Any additional protocols to include in WebSocket connection - */ - protocols?: string | string[]; - /** - * - */ - type: 'json' | 'binary'; - /** - * Cluster name - */ - cluster?: string; - /** - * Message callback - */ - onMessage: (data: T) => void; - } -) { - const path = [url]; - const protocols = ['base64.binary.k8s.io', ...(moreProtocols ?? [])]; - - const token = getToken(cluster); - if (token) { - const encodedToken = btoa(token).replace(/=/g, ''); - protocols.push(`base64url.bearer.authorization.k8s.io.${encodedToken}`); - } - - if (cluster) { - path.unshift('clusters', cluster); +export const WebSocketManager = { + /** Current WebSocket connection instance */ + socket: null as WebSocket | null, + + /** Flag to track if a connection attempt is in progress */ + connecting: false, + + /** Map of message handlers for each subscription path + * Key format: clusterId:path:query + * Value: Set of callback functions for that subscription + */ + listeners: new Map void>>(), + + /** Set of paths that have received a COMPLETE message + * Used to prevent processing further messages for completed paths + */ + completedPaths: new Set(), + + /** Set of active WebSocket subscriptions to prevent duplicates + * Keys are in format: clusterId:path:query + */ + activeSubscriptions: new Set(), + + /** + * Creates a unique key for identifying WebSocket subscriptions + * @param clusterId - The ID of the Kubernetes cluster + * @param path - The API path being watched + * @param query - Query parameters for the subscription + * @returns A unique string key in format clusterId:path:query + */ + createKey(clusterId: string, path: string, query: string): string { + return `${clusterId}:${path}:${query}`; + }, + + /** + * Establishes or returns existing WebSocket connection + * Implements connection pooling and handles reconnection + * @returns Promise resolving to WebSocket instance + * @throws Error if connection fails + */ + async connect(): Promise { + // Return existing connection if available + if (this.socket?.readyState === WebSocket.OPEN) { + return this.socket; + } + + // Wait for existing connection attempt to complete + if (this.connecting) { + return new Promise(resolve => { + const checkConnection = setInterval(() => { + if (this.socket?.readyState === WebSocket.OPEN) { + clearInterval(checkConnection); + resolve(this.socket); + } + }, 100); + }); + } + + this.connecting = true; + const wsUrl = `${BASE_WS_URL}${MULTIPLEXER_ENDPOINT}`; + + return new Promise((resolve, reject) => { + const socket = new WebSocket(wsUrl); + socket.onopen = () => { + this.socket = socket; + this.connecting = false; + resolve(socket); + }; + + socket.onmessage = (event: MessageEvent) => { + this.handleWebSocketMessage(event); + }; + + socket.onerror = event => { + console.error('WebSocket error:', event); + this.connecting = false; + reject(new Error('WebSocket connection failed')); + }; + + socket.onclose = () => { + this.handleWebSocketClose(); + }; + }); + }, + + /** + * Handles incoming WebSocket messages + * Parses messages and distributes them to appropriate listeners + * @param event - Raw WebSocket message event + */ + handleWebSocketMessage(event: MessageEvent): void { try { - const kubeconfig = await findKubeconfigByClusterName(cluster); + const data = JSON.parse(event.data); + if (!data.clusterId || !data.path) return; - if (kubeconfig !== null) { - const userID = getUserIdFromLocalStorage(); - protocols.push(`base64url.headlamp.authorization.k8s.io.${userID}`); + const key = this.createKey(data.clusterId, data.path, data.query || ''); + + if (data.type === 'COMPLETE') { + this.handleCompletionMessage(data, key); + return; } - } catch (error) { - console.error('Error while finding kubeconfig:', error); - } - } - - const socket = new WebSocket(makeUrl([BASE_WS_URL, ...path], {}), protocols); - socket.binaryType = 'arraybuffer'; - socket.addEventListener('message', (body: MessageEvent) => { - const data = type === 'json' ? JSON.parse(body.data) : body.data; - onMessage(data); - }); - socket.addEventListener('error', error => { - console.error('WebSocket error:', error); - }); - - return socket; -} -// Global state for useWebSocket hook -// Keeps track of open WebSocket connections and active listeners -const sockets = new Map(); -const listeners = new Map void>>(); + if (this.completedPaths.has(key)) { + return; + } + + // Parse the update data + let update; + try { + update = data.data ? JSON.parse(data.data) : data; + } catch (err) { + console.error('Failed to parse update data:', err); + return; + } + + // Only notify listeners if we have a valid update + if (update && typeof update === 'object') { + this.listeners.get(key)?.forEach(listener => listener(update)); + } + } catch (err) { + console.error('Failed to process WebSocket message:', err); + } + }, -/** - * Creates or joins existing WebSocket connection - * - * @param url - endpoint URL - * @param options - WebSocket options - */ -export function useWebSocket({ - url: createUrl, - enabled = true, - protocols, - type = 'json', - cluster, - onMessage, -}: { - url: () => string; - enabled?: boolean; /** - * Any additional protocols to include in WebSocket connection + * Handles COMPLETE type messages from the server + * Marks paths as completed and sends close message + * @param data - The complete message data + * @param key - The subscription key */ - protocols?: string | string[]; + handleCompletionMessage(data: any, key: string): void { + this.completedPaths.add(key); + if (this.socket?.readyState === WebSocket.OPEN) { + const closeMsg: WebSocketMessage = { + clusterId: data.clusterId, + path: data.path, + query: data.query || '', + userId: data.userId || '', + type: 'CLOSE', + }; + this.socket.send(JSON.stringify(closeMsg)); + } + }, + /** - * Type of websocket data + * Handles WebSocket connection close events + * Implements reconnection logic with delay */ - type?: 'json' | 'binary'; + handleWebSocketClose(): void { + console.log('WebSocket closed, attempting reconnect...'); + this.socket = null; + this.connecting = false; + if (this.listeners.size > 0) { + setTimeout(() => this.connect(), 1000); + } + }, + /** - * Cluster name + * Subscribes to WebSocket updates for a specific path + * Manages subscription lifecycle and prevents duplicates + * @param clusterId - The ID of the Kubernetes cluster to watch + * @param path - The API path to watch + * @param query - Query parameters including resourceVersion + * @param onMessage - Callback function to handle incoming messages + * @returns Promise resolving to cleanup function */ - cluster?: string; + async subscribe( + clusterId: string, + path: string, + query: string, + onMessage: (data: any) => void + ): Promise<() => void> { + const key = this.createKey(clusterId, path, query); + + // Don't create duplicate subscriptions for the same path + if (this.activeSubscriptions.has(key)) { + if (!this.listeners.has(key)) { + this.listeners.set(key, new Set()); + } + this.listeners.get(key)!.add(onMessage); + return () => this.handleUnsubscribe(key, onMessage, null, path, query); + } + + this.activeSubscriptions.add(key); + if (!this.listeners.has(key)) { + this.listeners.set(key, new Set()); + } + this.listeners.get(key)!.add(onMessage); + + const socket = await this.connect(); + const userId = getUserIdFromLocalStorage(); + + const message: WebSocketMessage = { + clusterId, + path, + query, + userId: userId || '', + type: 'REQUEST', + }; + + socket.send(JSON.stringify(message)); + + return () => { + this.activeSubscriptions.delete(key); + this.handleUnsubscribe(key, onMessage, userId, path, query); + }; + }, + /** - * Message callback + * Handles cleanup when unsubscribing from a WebSocket path + * Removes listeners and closes connection if no more subscriptions + * @param key - The unique subscription key + * @param onMessage - The message handler to remove + * @param userId - The user ID associated with the subscription + * @param path - The API path being watched + * @param query - Query parameters for the subscription */ - onMessage: (data: T) => void; -}) { - const url = useMemo(() => (enabled ? createUrl() : ''), [enabled]); - const connections = useMemo(() => [{ cluster: cluster ?? '', url, onMessage }], [cluster, url]); - - return useWebSockets({ - connections, - protocols, - type, - }); -} + handleUnsubscribe( + key: string, + onMessage: (data: any) => void, + userId: string | null, + path: string, + query: string + ): void { + const listeners = this.listeners.get(key); + listeners?.delete(onMessage); + + if (listeners?.size === 0) { + this.listeners.delete(key); + this.completedPaths.delete(key); + this.activeSubscriptions.delete(key); -export type WebSocketConnectionRequest = { - cluster: string; - url: string; - onMessage: (data: T) => void; + if (this.socket?.readyState === WebSocket.OPEN) { + const [clusterId] = key.split(':'); + const closeMsg: WebSocketMessage = { + clusterId, + path, + query, + userId: userId || '', + type: 'CLOSE', + }; + this.socket.send(JSON.stringify(closeMsg)); + } + } + + if (this.listeners.size === 0) { + this.socket?.close(); + this.socket = null; + } + }, }; /** - * Creates or joins mutiple existing WebSocket connections + * React hook for WebSocket subscription to Kubernetes resources + * @param options - Configuration options for the WebSocket connection + * @param options.url - Function that returns the WebSocket URL to connect to + * @param options.enabled - Whether the WebSocket connection should be active + * @param options.cluster - The Kubernetes cluster ID to watch + * @param options.onMessage - Callback function to handle incoming Kubernetes events + * @param options.onError - Callback function to handle connection errors * - * @param url - endpoint URL - * @param options - WebSocket options + * @example + * useWebSocket({ + * url: () => '/api/v1/pods?watch=1', + * enabled: true, + * cluster: 'my-cluster', + * onMessage: (event) => console.log('Pod update:', event), + * onError: (error) => console.error('WebSocket error:', error), + * }); */ -export function useWebSockets({ - connections, +export function useWebSocket({ + url: createUrl, enabled = true, - protocols, - type = 'json', + cluster = '', + onMessage, + onError, }: { + /** Function that returns the WebSocket URL to connect to */ + url: () => string; + /** Whether the WebSocket connection should be active */ enabled?: boolean; - /** Make sure that connections value is stable between renders */ - connections: Array>; - /** - * Any additional protocols to include in WebSocket connection - * make sure that the value is stable between renders - */ - protocols?: string | string[]; + /** The Kubernetes cluster ID to watch */ + cluster?: string; + /** Callback function to handle incoming Kubernetes events */ + onMessage: (data: KubeListUpdateEvent) => void; + /** Callback function to handle connection errors */ + onError?: (error: Error) => void; +}) { /** - * Type of websocket data + * Memoized URL to prevent unnecessary reconnections */ - type?: 'json' | 'binary'; -}) { + const url = useMemo(() => (enabled ? createUrl() : ''), [enabled, createUrl]); + useEffect(() => { - if (!enabled) return; - - let isCurrent = true; - - /** Open a connection to websocket */ - function connect({ cluster, url, onMessage }: WebSocketConnectionRequest) { - const connectionKey = cluster + url; - - if (!sockets.has(connectionKey)) { - // Add new listener for this URL - listeners.set(connectionKey, [...(listeners.get(connectionKey) ?? []), onMessage]); - - // Mark socket as pending, so we don't open more than one - sockets.set(connectionKey, 'pending'); - - let ws: WebSocket | undefined; - openWebSocket(url, { protocols, type, cluster, onMessage }) - .then(socket => { - ws = socket; - - // Hook was unmounted while it was connecting to WebSocket - // so we close the socket and clean up - if (!isCurrent) { - ws.close(); - sockets.delete(connectionKey); - return; - } - - sockets.set(connectionKey, ws); - }) - .catch(err => { - console.error(err); - }); - } + if (!enabled || !url) return; + + const parsedUrl = new URL(url, BASE_WS_URL); + let cleanup: (() => void) | undefined; - return () => { - const connectionKey = cluster + url; - - // Clean up the listener - const newListeners = listeners.get(connectionKey)?.filter(it => it !== onMessage) ?? []; - listeners.set(connectionKey, newListeners); - - // No one is listening to the connection - // so we can close it - if (newListeners.length === 0) { - const maybeExisting = sockets.get(connectionKey); - if (maybeExisting) { - if (maybeExisting !== 'pending') { - maybeExisting.close(); - } - sockets.delete(connectionKey); + WebSocketManager.subscribe( + cluster, + parsedUrl.pathname, + parsedUrl.search.slice(1), + (update: any) => { + try { + if (isKubeListUpdateEvent(update)) { + onMessage(update); } + } catch (err) { + console.error('Failed to process WebSocket message:', err); + onError?.(err as Error); } - }; - } - - const disconnectCallbacks = connections.map(endpoint => connect(endpoint)); + } + ).then( + unsubscribe => { + cleanup = unsubscribe; + }, + error => { + console.error('WebSocket subscription failed:', error); + onError?.(error); + } + ); + // Cleanup function to unsubscribe when the component unmounts + // or when any of the dependencies change return () => { - isCurrent = false; - disconnectCallbacks.forEach(fn => fn()); + cleanup?.(); }; - }, [enabled, type, connections, protocols]); + }, [enabled, url, cluster, onMessage, onError]); +} + +/** + * Type guard to check if a message is a valid Kubernetes list update event + * @param data - The data to check + * @returns True if the data is a valid KubeListUpdateEvent + */ +function isKubeListUpdateEvent( + data: any +): data is KubeListUpdateEvent { + return ( + data && + typeof data === 'object' && + 'type' in data && + 'object' in data && + ['ADDED', 'MODIFIED', 'DELETED'].includes(data.type) + ); }