Skip to content

Commit

Permalink
feat(serve): Refactor /api/ws to be event driven and let the client s…
Browse files Browse the repository at this point in the history
…ubscribe to the logs it need
  • Loading branch information
etu committed Jun 12, 2024
1 parent 486ff2f commit d0cbab9
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 83 deletions.
6 changes: 6 additions & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ GET /api/state/:name
```http
GET /api/ws
```

This will return the state of all servers and will update in real-time.

If the client sends a message in the format of `{"name": "server-name"}`,
it will also return the logs of that server along side the overview state
of all the servers.
52 changes: 40 additions & 12 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"os"
"path/filepath"
"sort"

"github.com/gorilla/websocket"
"github.com/jedib0t/go-pretty/v6/table"
Expand Down Expand Up @@ -71,13 +72,21 @@ func (cli *Cli) List(format string) {
// Parse the json
json.Unmarshal(body, &runningState)

// Extract keys and sort them
var keys []string
for k := range state {
keys = append(keys, k)
}
sort.Strings(keys)

switch format {
case "table":
output := table.NewWriter()
output.SetOutputMirror(os.Stdout)
output.AppendHeader(table.Row{"Name", "Running", "Directory", "Command"})

for _, val := range state {
for _, key := range keys {
val := state[key]
isRunning := false

if _, ok := runningState.Servers[val.Name]; ok {
Expand All @@ -95,7 +104,8 @@ func (cli *Cli) List(format string) {

output.Write([]string{"Name", "Running", "Directory", "Command"})

for _, val := range state {
for _, key := range keys {
val := state[key]
isRunning := false

if _, ok := runningState.Servers[val.Name]; ok {
Expand Down Expand Up @@ -271,8 +281,7 @@ func (cli *Cli) Stop(name string) {
}

func (cli *Cli) Logs(name string) {
var state ServeFullState
logsMaxIndex := -1
var logsMaxIndex int = -1

// Build URL to establish websocket connection
wsUrl := fmt.Sprintf("ws://%s:%d/api/ws", cli.config.Settings.ListenAddress, cli.config.Settings.ListenPort)
Expand All @@ -285,24 +294,43 @@ func (cli *Cli) Logs(name string) {
}
defer conn.Close()

// Start loop to recieve and process incoming websocket messages
// Send subscription message for the specific server
subscription := map[string]string{"name": name}
subMsg, err := json.Marshal(subscription)
if err != nil {
log.Printf("Failed to marshal subscription message: %s\n", err)
os.Exit(2)
}

err = conn.WriteMessage(websocket.TextMessage, subMsg)
if err != nil {
log.Printf("Failed to send subscription message: %s\n", err)
os.Exit(3)
}

// Start loop to receive and process incoming websocket messages
for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Printf("Failed to read websocket message: %s\n", err)
os.Exit(2)
os.Exit(4)
}

// Parse the json message
json.Unmarshal(message, &state)
// Parse the JSON message
var serverLogs ServerItemWithLogs
err = json.Unmarshal(message, &serverLogs)
if err != nil {
log.Printf("Failed to unmarshal websocket message: %s\n", err)
continue
}

if _, ok := state.RunnerState[name]; !ok {
log.Printf("Process '%s' doesn't seem to be running", name)
os.Exit(3)
// Check if the message is for the subscribed server
if serverLogs.ServerItem.Name != name {
continue
}

// Process the logs
for key, val := range state.RunnerState[name].Logs {
for key, val := range serverLogs.Logs {
if key > logsMaxIndex {
if val.Output == "stdout" {
fmt.Println(val.Output, val.Timestamp.Format("15:04:05"), "|", val.Message)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {
}

runner := Runner{config: &config}
serve := Serve{config: &config, runner: &runner}
serve := NewServe(&config, &runner)
cli := Cli{config: &config}

config.Read(configFile)
Expand Down
14 changes: 12 additions & 2 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ActiveRunner struct {
Logs []LogEntry
}

func (runner *Runner) Start(name string) error {
func (runner *Runner) Start(name string, serve *Serve) error {
// Init active processes map
if runner.ActiveProcesses == nil {
runner.ActiveProcesses = make(map[string]*ActiveRunner)
Expand Down Expand Up @@ -125,6 +125,8 @@ func (runner *Runner) Start(name string) error {
Message: stdoutScanner.Text(),
Output: "stdout",
})

serve.stateChange <- true
}
}()

Expand All @@ -138,12 +140,17 @@ func (runner *Runner) Start(name string) error {
Message: stderrScanner.Text(),
Output: "stderr",
})

serve.stateChange <- true
}
}()

// Store the Cmd process as an active process
runner.ActiveProcesses[name] = &activeRunner

// Notify state change on start
serve.stateChange <- true

return nil
}

Expand Down Expand Up @@ -177,7 +184,7 @@ func (runner *Runner) randomizePortNumber() (uint, error) {
return 0, fmt.Errorf("Tried to randomize an unused port, failed")
}

func (runner *Runner) Stop(name string) error {
func (runner *Runner) Stop(name string, serve *Serve) error {
// Init active processes map
if runner.ActiveProcesses == nil {
runner.ActiveProcesses = make(map[string]*ActiveRunner)
Expand Down Expand Up @@ -213,5 +220,8 @@ func (runner *Runner) Stop(name string) error {
// Delete old status for process
delete(runner.ActiveProcesses, name)

// Notify state change on stop
serve.stateChange <- true

return nil
}
142 changes: 95 additions & 47 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,18 @@ import (
"log"
"net/http"
"os"
"time"
"sync"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)

type Serve struct {
config *Config
runner *Runner
}

type ServeFullState struct {
Config *Config `json:"configs"`
RunnerState map[string]ServeRunnerResponseItem `json:"runners"`
config *Config
runner *Runner
stateChange chan bool // Channel to signal a state change
clientSubscriptions map[*websocket.Conn]string // Map of client connections and their subscriptions
clientLocks map[*websocket.Conn]*sync.Mutex // Map of locks for each client connection to not send multiple messages at once
}

type ServerItem struct {
Expand All @@ -45,10 +43,14 @@ type ServeMessageResponse struct {
Message string `json:"message"`
}

type ServeRunnerResponseItem struct {
Name string `json:"name"`
Port uint `json:"port"`
Logs []LogEntry `json:"logs"`
func NewServe(config *Config, runner *Runner) *Serve {
return &Serve{
config: config,
runner: runner,
stateChange: make(chan bool),
clientSubscriptions: make(map[*websocket.Conn]string),
clientLocks: make(map[*websocket.Conn]*sync.Mutex),
}
}

func (serve *Serve) Run() {
Expand Down Expand Up @@ -126,7 +128,7 @@ func (serve *Serve) newRouter() *mux.Router {
}

// Stop servers on update in case it's running.
serve.runner.Stop(server.Name)
serve.runner.Stop(server.Name, serve)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
Expand All @@ -137,7 +139,7 @@ func (serve *Serve) newRouter() *mux.Router {
vars := mux.Vars(r)
var resp ServeMessageResponse

err := serve.runner.Stop(vars["name"])
err := serve.runner.Stop(vars["name"], serve)

if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -167,7 +169,7 @@ func (serve *Serve) newRouter() *mux.Router {
var resp ServeMessageResponse
vars := mux.Vars(r)

err := serve.runner.Start(vars["name"])
err := serve.runner.Start(vars["name"], serve)

if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand All @@ -186,7 +188,7 @@ func (serve *Serve) newRouter() *mux.Router {
var resp ServeMessageResponse
vars := mux.Vars(r)

err := serve.runner.Stop(vars["name"])
err := serve.runner.Stop(vars["name"], serve)

if err != nil {
w.WriteHeader(http.StatusBadRequest)
Expand Down Expand Up @@ -229,45 +231,72 @@ func (serve *Serve) newRouter() *mux.Router {
}
defer conn.Close()

var lastState []byte

// Return runner state over the websocket
for {
var state = ServeFullState{
Config: serve.config,
RunnerState: make(map[string]ServeRunnerResponseItem),
}
serve.clientSubscriptions[conn] = "" // Initialize with no subscription
serve.clientLocks[conn] = &sync.Mutex{} // Initialize mutex for this connection

go func() {
defer func() {
conn.Close()
delete(serve.clientSubscriptions, conn)
delete(serve.clientLocks, conn) // Remove mutex for this connection
}()

// Send initial list state on connect to the client
listState := serve.GetServerList()
serve.sendMessage(conn, listState)

for {
_, message, err := conn.ReadMessage()
if err != nil {
log.Println("ReadMessage:", err)
break
}

for key, value := range serve.runner.ActiveProcesses {
state.RunnerState[key] = ServeRunnerResponseItem{
Name: key,
Port: value.Port,
Logs: value.Logs,
// Parse the subscription message
var subscription map[string]string
if err := json.Unmarshal(message, &subscription); err != nil {
log.Println("Unmarshal:", err)
continue
}
}

stateJson, err := json.Marshal(state)
if name, ok := subscription["name"]; ok {
serve.clientSubscriptions[conn] = name

if err != nil {
fmt.Println("Error encoding JSON:", err)
return
}
// Send initial state for the subscribed server
serverState := serve.GetServerLogs(name)

if string(stateJson) == string(lastState) {
// Sleep a bit to then try again
time.Sleep(time.Millisecond * 100)

// Continue to next iteration
continue
// Only send logs if there are any
if len(serverState.Logs) > 0 {
serve.sendMessage(conn, serverState)
}
}
}
}()

// Send the updated state
conn.WriteMessage(websocket.TextMessage, stateJson)

// Update last state
lastState = stateJson
for {
select {
case <-serve.stateChange:
for client, name := range serve.clientSubscriptions {
// Send the list state regardless of subscription
listState := serve.GetServerList()
serve.sendMessage(client, listState)

// Skip clients with no subscription
if name == "" {
continue
}

// Send state for the subscribed server
serverState := serve.GetServerLogs(name)

// Only send logs if there are any
if len(serverState.Logs) > 0 {
serve.sendMessage(client, serverState)
}
}
}
}
})
}).Methods(http.MethodGet)

return router
}
Expand Down Expand Up @@ -333,3 +362,22 @@ func (serve *Serve) GetServerLogs(name string) ServerItemWithLogs {

return serverItemWithLogs
}

// Send a message to a client over a websocket connection
func (serve *Serve) sendMessage(client *websocket.Conn, data interface{}) {
message, err := json.Marshal(data)
if err != nil {
log.Println("Marshal:", err)
return
}

serve.clientLocks[client].Lock()
defer serve.clientLocks[client].Unlock()

if err := client.WriteMessage(websocket.TextMessage, message); err != nil {
log.Println("WriteMessage:", err)
client.Close()
delete(serve.clientSubscriptions, client)
delete(serve.clientLocks, client)
}
}
Loading

0 comments on commit d0cbab9

Please sign in to comment.