Skip to content

Commit

Permalink
Allow client to tail process logs
Browse files Browse the repository at this point in the history
  • Loading branch information
F1bonacc1 committed Feb 26, 2023
1 parent 6bc7fb3 commit 9ed6dbd
Show file tree
Hide file tree
Showing 21 changed files with 319 additions and 31 deletions.
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ buildGoModule rec {

nativeBuildInputs = [ installShellFiles ];

vendorSha256 = "g82JRmfbKH/XEZx2aLZOcyen23vOxQXR7VyeAYxCSi4=";
vendorSha256 = "iiGn0dYHNEp5Bs54X44sHbsG3HD92Xs4oah4iZXqqvQ=";
#vendorSha256 = lib.fakeSha256;

postInstall = ''
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/fatih/color v1.14.1
github.com/gdamore/tcell/v2 v2.5.4
github.com/gin-gonic/gin v1.8.2
github.com/gorilla/websocket v1.5.0
github.com/imdario/mergo v0.3.13
github.com/joho/godotenv v1.5.1
github.com/rivo/tview v0.0.0-20230208211350-7dfff1ce7854
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk=
Expand Down
6 changes: 5 additions & 1 deletion process-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ processes:
command: "kcalc"
disabled: true

pc_log:
__pc_log:
command: "tail -f -n100 process-compose-${USER}.log"
working_dir: "/tmp"
environment:
Expand All @@ -128,6 +128,10 @@ processes:
process0:
condition: process_completed

__pc_log_client:
command: "tail -f -n100 process-compose-${USER}-client.log"
working_dir: "/tmp"


bat_config:
command: "batcat -f process-compose.yaml"
Expand Down
3 changes: 1 addition & 2 deletions src/api/pc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"net/http"
"strconv"

"github.com/gin-gonic/gin"

"github.com/f1bonacc1/process-compose/src/app"
"github.com/gin-gonic/gin"
)

// @Schemes
Expand Down
3 changes: 3 additions & 0 deletions src/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,8 @@ func InitRoutes(useLogger bool) *gin.Engine {
r.POST("/process/start/:name", StartProcess)
r.POST("/process/restart/:name", RestartProcess)

//websocket
r.GET("/process/logs/ws/:name/:endOffset/:follow", HandleLogsStream)

return r
}
6 changes: 6 additions & 0 deletions src/api/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package api

type LogMessage struct {
Message string `json:"message"`
ProcessName string `json:"process_name"`
}
91 changes: 91 additions & 0 deletions src/api/ws_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package api

import (
"github.com/f1bonacc1/process-compose/src/app"
"github.com/f1bonacc1/process-compose/src/pclog"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"net/http"
"strconv"
)

var upgrader = websocket.Upgrader{}

func HandleLogsStream(c *gin.Context) {
procName := c.Param("name")
follow := c.Param("follow") == "true"
endOffset, err := strconv.Atoi(c.Param("endOffset"))
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

done := make(chan struct{})
logChan := make(chan LogMessage, 256)
connector := pclog.NewConnector(func(messages []string) {
for _, message := range messages {
msg := LogMessage{
Message: message,
ProcessName: procName,
}
logChan <- msg
}
if !follow {
close(logChan)
}
},
func(message string) {
msg := LogMessage{
Message: message,
ProcessName: procName,
}
logChan <- msg
},
endOffset)
go handleLog(ws, procName, connector, logChan, done)
if follow {
go handleIncoming(ws, done)
}
app.PROJ.GetLogsAndSubscribe(procName, connector)
}

func handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) {
defer app.PROJ.UnSubscribeLogger(procName, connector)
defer ws.Close()
for {
select {
case msg, open := <-logChan:
if err := ws.WriteJSON(&msg); err != nil {
log.Err(err).Msg("Failed to write to socket")
return
}
if !open {
return
}
case <-done:
log.Warn().Msg("Socket closed remotely")
return
}

}
}

func handleIncoming(ws *websocket.Conn, done chan struct{}) {
defer close(done)
for {
msgType, _, err := ws.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return
}
log.Err(err).Msgf("Failed to read from socket %d", msgType)
return
}
}
}
7 changes: 3 additions & 4 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,21 +289,20 @@ func (p *ProjectRunner) GetProcessLogLength(name string) int {
return logs.GetLogLength()
}

func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.PcLogObserver) {

func (p *ProjectRunner) GetLogsAndSubscribe(name string, observer pclog.LogObserver) {
logs, err := p.getProcessLog(name)
if err != nil {
return
}
logs.GetLogsAndSubscribe(observer)
}

func (p *ProjectRunner) UnSubscribeLogger(name string) {
func (p *ProjectRunner) UnSubscribeLogger(name string, observer pclog.LogObserver) {
logs, err := p.getProcessLog(name)
if err != nil {
return
}
logs.UnSubscribe()
logs.UnSubscribe(observer)
}

func (p *ProjectRunner) selectRunningProcesses(procList []string) error {
Expand Down
69 changes: 69 additions & 0 deletions src/client/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package client

import (
"fmt"
"github.com/f1bonacc1/process-compose/src/api"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"os"
"os/signal"
"time"
)

func ReadProcessLogs(address string, port int, name string, offset int, follow bool) error {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
url := fmt.Sprintf("ws://%s:%d/process/logs/ws/%s/%d/%v", address, port, name, offset, follow)
log.Info().Msgf("Connecting to %s", url)
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Error().Msgf("failed to dial to %s error: %v", url, err)
return err
}
defer ws.Close()
done := make(chan struct{})

go readLogs(done, ws, follow)

for {
select {
case <-done:
return nil
case <-interrupt:
fmt.Println("interrupt")

// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
fmt.Println("write close:", err)
return nil
}
select {
case <-done:
case <-time.After(time.Second):
}
return nil
}
}
}

func readLogs(done chan struct{}, ws *websocket.Conn, follow bool) {
defer close(done)
for {
var message api.LogMessage
if err := ws.ReadJSON(&message); err != nil {
if !follow && websocket.IsCloseError(err, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
return
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return
}
log.Error().Msgf("failed to read message: %v", err)
return
}
if len(message.ProcessName) > 0 {
fmt.Printf("%s:\t%s\n", message.ProcessName, message.Message)
}
}
}
2 changes: 1 addition & 1 deletion src/cmd/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var infoCmd = &cobra.Command{
func printInfo() {
format := "%-15s %s\n"
fmt.Println("Process Compose")
fmt.Printf(format, "Logs:", config.LogFilePath)
fmt.Printf(format, "Logs:", config.GetLogFilePath())

path := config.GetShortCutsPath()
if len(path) > 0 {
Expand Down
39 changes: 39 additions & 0 deletions src/cmd/logs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright © 2023 NAME HERE <EMAIL ADDRESS>
*/
package cmd

import (
"github.com/f1bonacc1/process-compose/src/client"
"github.com/rs/zerolog/log"
"math"

"github.com/spf13/cobra"
)

var (
follow bool
tailLength int
)

// logsCmd represents the logs command
var logsCmd = &cobra.Command{
Use: "logs [PROCESS]",
Short: "Fetch the logs of a process",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
name := args[0]
err := client.ReadProcessLogs(pcAddress, port, name, tailLength, follow)
if err != nil {
log.Error().Msgf("Failed to fetch logs for process %s: %v", name, err)
return
}
},
}

func init() {
processCmd.AddCommand(logsCmd)

logsCmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow log output")
logsCmd.Flags().IntVarP(&tailLength, "tail", "n", math.MaxInt, "Number of lines to show from the end of the logs (default - all)")
}
2 changes: 1 addition & 1 deletion src/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {

rootCmd.Flags().BoolVarP(&isTui, "tui", "t", true, "disable tui (-t=false)")
rootCmd.PersistentFlags().IntVarP(&port, "port", "p", getPortDefault(), "port number")
rootCmd.PersistentFlags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load")
rootCmd.Flags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load")
}

func getTuiDefault() bool {
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ var upCmd = &cobra.Command{
If one or more process names are passed as arguments,
will start them and their dependencies only`,
Run: func(cmd *cobra.Command, args []string) {
if !cmd.Flags().Changed("tui") {
isTui = getTuiDefault()
}
api.StartHttpServer(!isTui, port)
runProject(args, noDeps)
},
Expand All @@ -27,4 +30,6 @@ func init() {

upCmd.Flags().BoolVarP(&isTui, "tui", "t", true, "disable tui (-t=false)")
upCmd.Flags().BoolVarP(&noDeps, "no-deps", "", false, "don't start dependent processes")
upCmd.Flags().StringArrayVarP(&opts.FileNames, "config", "f", getConfigDefault(), "path to config files to load")

}
35 changes: 35 additions & 0 deletions src/pclog/log_observer_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pclog

type multiLineHandler func(string []string)
type lineHandler func(string string)

type Connector struct {
LogObserver
logLinesHandler multiLineHandler
logMessageHandler lineHandler
uniqueId string
taiLength int
}

func NewConnector(mlHandler multiLineHandler, slHandler lineHandler, tail int) *Connector {
return &Connector{
logLinesHandler: mlHandler,
logMessageHandler: slHandler,
uniqueId: GenerateUniqueID(10),
taiLength: tail,
}
}

func (c *Connector) AddLine(line string) {
c.logMessageHandler(line)
}
func (c *Connector) SetLines(lines []string) {
c.logLinesHandler(lines)
}
func (c *Connector) GetUniqueID() string {
return c.uniqueId
}

func (c *Connector) GetTailLength() int {
return c.taiLength
}
4 changes: 3 additions & 1 deletion src/pclog/logs_observer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pclog

type PcLogObserver interface {
type LogObserver interface {
AddLine(line string)
SetLines(lines []string)
GetTailLength() int
GetUniqueID() string
}
6 changes: 3 additions & 3 deletions src/pclog/nil_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ func NewNilLogger() *PcNilLog {
func (l *PcNilLog) Sync() {
}

func (l PcNilLog) Info(message string, process string, replica int) {
func (l *PcNilLog) Info(message string, process string, replica int) {

}

func (l PcNilLog) Error(message string, process string, replica int) {
func (l *PcNilLog) Error(message string, process string, replica int) {

}

func (l PcNilLog) Close() {
func (l *PcNilLog) Close() {

}
Loading

0 comments on commit 9ed6dbd

Please sign in to comment.