Skip to content

Commit

Permalink
client: Switch to using go.uber.org/zap for logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
fancycode committed Sep 15, 2024
1 parent 8037124 commit 1b285a1
Showing 1 changed file with 109 additions and 45 deletions.
154 changes: 109 additions & 45 deletions client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"flag"
"fmt"
"io"
"log"
pseudorand "math/rand"
"net"
"net/http"
Expand All @@ -46,6 +45,7 @@ import (
"github.com/gorilla/securecookie"
"github.com/gorilla/websocket"
"github.com/mailru/easyjson"
"go.uber.org/zap"

signaling "github.com/strukturag/nextcloud-spreed-signaling"
)
Expand Down Expand Up @@ -81,6 +81,8 @@ const (
)

type Stats struct {
log *zap.Logger

numRecvMessages atomic.Uint64
numSentMessages atomic.Uint64
resetRecvMessages uint64
Expand All @@ -107,10 +109,13 @@ func (s *Stats) Log() {
sentMessages := totalSentMessages - s.resetSentMessages
totalRecvMessages := s.numRecvMessages.Load()
recvMessages := totalRecvMessages - s.resetRecvMessages
log.Printf("Stats: sent=%d (%d/sec), recv=%d (%d/sec), delta=%d",
totalSentMessages, sentMessages/perSec,
totalRecvMessages, recvMessages/perSec,
totalSentMessages-totalRecvMessages)
s.log.Info("Stats updated",
zap.Uint64("sent", totalSentMessages),
zap.Uint64("sentspeed", sentMessages/perSec),
zap.Uint64("recv", totalRecvMessages),
zap.Uint64("recvspeed", recvMessages/perSec),
zap.Uint64("delta", totalSentMessages-totalRecvMessages),
)
s.reset(now)
}

Expand All @@ -119,6 +124,7 @@ type MessagePayload struct {
}

type SignalingClient struct {
log *zap.Logger
readyWg *sync.WaitGroup
cookie *securecookie.SecureCookie

Expand All @@ -135,13 +141,14 @@ type SignalingClient struct {
userId string
}

func NewSignalingClient(cookie *securecookie.SecureCookie, url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) {
func NewSignalingClient(log *zap.Logger, cookie *securecookie.SecureCookie, url string, stats *Stats, readyWg *sync.WaitGroup, doneWg *sync.WaitGroup) (*SignalingClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}

client := &SignalingClient{
log: log,
readyWg: readyWg,
cookie: cookie,

Expand Down Expand Up @@ -204,13 +211,19 @@ func (c *SignalingClient) processMessage(message *signaling.ServerMessage) {
case "message":
c.processMessageMessage(message)
case "bye":
log.Printf("Received bye: %+v", message.Bye)
c.log.Error("Received bye",
zap.Any("bye", message.Bye),
)
c.Close()
case "error":
log.Printf("Received error: %+v", message.Error)
c.log.Error("Received error",
zap.Any("error", message.Error),
)
c.Close()
default:
log.Printf("Unsupported message type: %+v", *message)
c.log.Warn("Unsupported message type",
zap.Stringer("message", message),
)
}
}

Expand All @@ -236,7 +249,10 @@ func (c *SignalingClient) processHelloMessage(message *signaling.ServerMessage)
c.privateSessionId = message.Hello.ResumeId
c.publicSessionId = c.privateToPublicSessionId(c.privateSessionId)
c.userId = message.Hello.UserId
log.Printf("Registered as %s (userid %s)", c.privateSessionId, c.userId)
c.log.Info("Registered",
zap.String("privateid", c.privateSessionId),
zap.String("userid", c.userId),
)
c.readyWg.Done()
}

Expand All @@ -249,14 +265,18 @@ func (c *SignalingClient) PublicSessionId() string {
func (c *SignalingClient) processMessageMessage(message *signaling.ServerMessage) {
var msg MessagePayload
if err := json.Unmarshal(message.Message.Data, &msg); err != nil {
log.Println("Error in unmarshal", err)
c.log.Error("Error in unmarshal",
zap.Error(err),
)
return
}

now := time.Now()
duration := now.Sub(msg.Now)
if duration > messageReportDuration {
log.Printf("Message took %s", duration)
c.log.Warn("Message took too long",
zap.Duration("duration", duration),
)
}
}

Expand All @@ -283,29 +303,37 @@ func (c *SignalingClient) readPump() {
websocket.CloseNormalClosure,
websocket.CloseGoingAway,
websocket.CloseNoStatusReceived) {
log.Printf("Error: %v", err)
c.log.Error("Error reading",
zap.Error(err),
)
}
break
}

if messageType != websocket.TextMessage {
log.Println("Unsupported message type", messageType)
c.log.Error("Unsupported message type",
zap.Int("type", messageType),
)
break
}

decodeBuffer.Reset()
if _, err := decodeBuffer.ReadFrom(reader); err != nil {
c.lock.Lock()
if c.conn != nil {
log.Println("Error reading message", err)
c.log.Error("Error reading message",
zap.Error(err),
)
}
c.lock.Unlock()
break
}

var message signaling.ServerMessage
if err := message.UnmarshalJSON(decodeBuffer.Bytes()); err != nil {
log.Printf("Error: %v", err)
c.log.Error("Error unmarshalling",
zap.Error(err),
)
break
}

Expand All @@ -327,7 +355,10 @@ func (c *SignalingClient) writeInternal(message *signaling.ClientMessage) bool {
return false
}

log.Println("Could not send message", message, err)
c.log.Error("Could not send message",
zap.Stringer("message", message),
zap.Error(err),
)
// TODO(jojo): Differentiate between JSON encode errors and websocket errors.
closeData = websocket.FormatCloseMessage(websocket.CloseInternalServerErr, "")
goto close
Expand Down Expand Up @@ -413,29 +444,33 @@ func (c *SignalingClient) SendMessages(clients []*SignalingClient) {
}
}

func registerAuthHandler(router *mux.Router) {
func registerAuthHandler(log *zap.Logger, router *mux.Router) {
router.HandleFunc("/auth", func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
log.Println("Error reading body:", err)
log.Error("Error reading body",
zap.Error(err),
)
return
}

rnd := r.Header.Get(signaling.HeaderBackendSignalingRandom)
checksum := r.Header.Get(signaling.HeaderBackendSignalingChecksum)
if rnd == "" || checksum == "" {
log.Println("No checksum headers found")
log.Error("No checksum headers found")
return
}

if verify := signaling.CalculateBackendChecksum(rnd, body, backendSecret); verify != checksum {
log.Println("Backend checksum verification failed")
log.Error("Backend checksum verification failed")
return
}

var request signaling.BackendClientRequest
if err := request.UnmarshalJSON(body); err != nil {
log.Println(err)
log.Error("Error unmarshalling",
zap.Error(err),
)
return
}

Expand All @@ -449,7 +484,9 @@ func registerAuthHandler(router *mux.Router) {

data, err := response.MarshalJSON()
if err != nil {
log.Println(err)
log.Error("Error marshalling response message",
zap.Error(err),
)
return
}

Expand All @@ -467,7 +504,9 @@ func registerAuthHandler(router *mux.Router) {

jsonpayload, err := payload.MarshalJSON()
if err != nil {
log.Println(err)
log.Error("Error marshalling payload",
zap.Error(err),
)
return
}

Expand All @@ -477,10 +516,12 @@ func registerAuthHandler(router *mux.Router) {
})
}

func getLocalIP() string {
func getLocalIP(log *zap.Logger) string {
interfaces, err := net.InterfaceAddrs()
if err != nil {
log.Fatal(err)
log.Fatal("Error getting interfaces",
zap.Error(err),
)
}
for _, intf := range interfaces {
switch t := intf.(type) {
Expand Down Expand Up @@ -508,11 +549,14 @@ func reverseSessionId(s string) (string, error) {

func main() {
flag.Parse()
log.SetFlags(0)

log := zap.Must(zap.NewDevelopment())

config, err := goconf.ReadConfigFile(*config)
if err != nil {
log.Fatal("Could not read configuration: ", err)
log.Fatal("Could not read configuration",
zap.Error(err),
)
}

secret, _ := config.GetString("backend", "secret")
Expand All @@ -523,7 +567,9 @@ func main() {
case 32:
case 64:
default:
log.Printf("WARNING: The sessions hash key should be 32 or 64 bytes but is %d bytes", len(hashKey))
log.Warn("The sessions hash key should be 32 or 64 bytes",
zap.Int("len", len(hashKey)),
)
}

blockKey, _ := config.GetString("sessions", "blockkey")
Expand All @@ -535,24 +581,30 @@ func main() {
case 24:
case 32:
default:
log.Fatalf("The sessions block key must be 16, 24 or 32 bytes but is %d bytes", len(blockKey))
log.Fatal("The sessions block key must be 16, 24 or 32 bytes",
zap.Int("len", len(blockKey)),
)
}
cookie := securecookie.New([]byte(hashKey), blockBytes).MaxAge(0)

cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
log.Printf("Using a maximum of %d CPUs", cpus)
log.Debug("Using number of CPUs",
zap.Int("cpus", cpus),
)

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)

r := mux.NewRouter()
registerAuthHandler(r)
registerAuthHandler(log, r)

localIP := getLocalIP()
localIP := getLocalIP(log)
listener, err := net.Listen("tcp", localIP+":0")
if err != nil {
log.Fatal(err)
log.Fatal("Error starting listener",
zap.Error(err),
)
}

server := http.Server{
Expand All @@ -562,7 +614,9 @@ func main() {
server.Serve(listener) // nolint
}()
backendUrl := "http://" + listener.Addr().String()
log.Println("Backend server running on", backendUrl)
log.Info("Backend server running",
zap.String("url", backendUrl),
)

urls := make([]url.URL, 0)
urlstrings := make([]string, 0)
Expand All @@ -575,24 +629,34 @@ func main() {
urls = append(urls, u)
urlstrings = append(urlstrings, u.String())
}
log.Printf("Connecting to %s", urlstrings)
log.Info("Connecting",
zap.Strings("urls", urlstrings),
)

clients := make([]*SignalingClient, 0)
stats := &Stats{}
stats := &Stats{
log: log,
}

if *maxClients < 2 {
log.Fatalf("Need at least 2 clients, got %d", *maxClients)
log.Fatal("Need at least 2 clients",
zap.Int("count", *maxClients),
)
}

log.Printf("Starting %d clients", *maxClients)
log.Info("Starting clients",
zap.Int("count", *maxClients),
)

var doneWg sync.WaitGroup
var readyWg sync.WaitGroup

for i := 0; i < *maxClients; i++ {
client, err := NewSignalingClient(cookie, urls[i%len(urls)].String(), stats, &readyWg, &doneWg)
client, err := NewSignalingClient(log, cookie, urls[i%len(urls)].String(), stats, &readyWg, &doneWg)
if err != nil {
log.Fatal(err)
log.Fatal("Error creating signaling client",
zap.Error(err),
)
}
defer client.Close()
readyWg.Add(1)
Expand All @@ -612,10 +676,10 @@ func main() {
clients = append(clients, client)
}

log.Println("Clients created")
log.Info("Clients created")
readyWg.Wait()

log.Println("All connections established")
log.Info("All connections established")

for _, c := range clients {
doneWg.Add(1)
Expand All @@ -632,14 +696,14 @@ loop:
for {
select {
case <-interrupt:
log.Println("Interrupted")
log.Info("Interrupted")
break loop
case <-report.C:
stats.Log()
}
}

log.Println("Waiting for clients to terminate ...")
log.Info("Waiting for clients to terminate ...")
for _, c := range clients {
c.Close()
}
Expand Down

0 comments on commit 1b285a1

Please sign in to comment.