Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: data race #8

Merged
merged 3 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ YOMO_ZIPPER=127.0.0.1:9000
YOMO_SNDR_NAME=prscd-sender
YOMO_RCVR_NAME=prscd-receiver
#OTEL_EXPORTER_OTLP_ENDPOINT=http://127.0.0.1:4318
YOMO_LOG_LEVEL=warn

# Server TLS
CERT_FILE=./lo.yomo.dev.cert
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ dist: clean

.PHONY: dev
dev:
$(GO) run -race ./cmd/prscd
YOMO_LOG_LEVEL=warn $(GO) run -race ./cmd/prscd

.PHONY: test
test:
Expand Down
11 changes: 6 additions & 5 deletions chirp/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ func (c *Channel) Broadcast(sig *psig.Signalling) {
// Dispatch messages to all peers in this channel of current node.
func (c *Channel) Dispatch(sig *psig.Signalling) {
// sig.Sid is sender's sid when sending message
log.Debug("[%s]\tSND>: %+v", sig.Sid, sig)
log.Debug("[SND>]", "sid", sig.Sid, "sig", sig)
var sender = sig.Sid
// do not broadcast APP_ID and Sid to end user
// do not broadcast APP_ID, Sid and Mesh to end user
sig.AppID = ""
sig.Sid = ""
sig.MeshID = ""
resp, err := msgpack.Marshal(sig)
if err != nil {
log.Error("msgpack marshal: %+v", err)
Expand All @@ -57,13 +58,13 @@ func (c *Channel) Dispatch(sig *psig.Signalling) {
sid := k.(string)
p := v.(*Peer)
if sid == sender {
util.Log.Debug("-----------ignore sender-self: %s", sender)
// util.Log.Debug("-----------ignore sender-self", "sender", sender)
return true
}
util.Log.Debug("[%s] BroadcastPresence to ch:%s, for sid:%s", sender, c.UniqID, p.Sid)
util.Log.Debug("BroadcastPresence to ch for sid", "sender", sender, "ch", c.UniqID, "sid", p.Sid)
err = p.conn.Write(resp)
if err != nil {
log.Error("ws.write error: %+v", err)
log.Error("ws.write error", "err", err)
}
return true
})
Expand Down
31 changes: 30 additions & 1 deletion chirp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chirp

import (
"net"
"sync"

"github.com/gobwas/ws/wsutil"
"github.com/quic-go/quic-go"
Expand All @@ -13,6 +14,8 @@ type Connection interface {
RemoteAddr() string
// Write the data to the connection
Write(msg []byte) error
// RawWrite write the raw bytes to the connection, this is a low-level implementation
RawWrite(buf []byte) (int, error)
}

/*** WebSocket ***/
Expand All @@ -26,6 +29,7 @@ func NewWebSocketConnection(conn net.Conn) Connection {

// WebSocketConnection is a WebSocket connection
type WebSocketConnection struct {
mu sync.Mutex
underlyingConn net.Conn
}

Expand All @@ -36,9 +40,18 @@ func (c *WebSocketConnection) RemoteAddr() string {

// Write the data to the connection
func (c *WebSocketConnection) Write(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()
return wsutil.WriteServerBinary(c.underlyingConn, msg)
}

// RawWrite write the raw bytes to the connection, this is a low-level implementation
func (c *WebSocketConnection) RawWrite(buf []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.underlyingConn.Write(buf)
}

/*** WebTransport ***/

// NewWebTransportConnection creates a new WebTransportConnection
Expand All @@ -50,6 +63,7 @@ func NewWebTransportConnection(conn quic.Connection) Connection {

// WebTransportConnection is a WebTransport connection
type WebTransportConnection struct {
mu sync.Mutex
underlyingConn quic.Connection
}

Expand All @@ -60,12 +74,27 @@ func (c *WebTransportConnection) RemoteAddr() string {

// Write the data to the connection
func (c *WebTransportConnection) Write(msg []byte) error {
c.mu.Lock()
defer c.mu.Unlock()

// add 0x00 to msg
buf := []byte{0x00}
buf = append(buf, msg...)
if err := c.underlyingConn.SendDatagram(buf); err != nil {
log.Error("[%s] SendMessage error: %v", c.RemoteAddr(), err)
log.Error("SendMessage error", "remote", c.RemoteAddr(), "err", err)
return err
}
return nil
}

// RawWrite write the raw bytes to the connection, this is a low-level implementation
func (c *WebTransportConnection) RawWrite(buf []byte) (int, error) {
c.mu.Lock()
defer c.mu.Unlock()

if err := c.underlyingConn.SendDatagram(buf); err != nil {
log.Error("SendMessage error", "remote", c.RemoteAddr(), "err", err)
return 0, err
}
return len(buf), nil
}
10 changes: 5 additions & 5 deletions chirp/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var AuthUserAndGetYoMoCredential func(publicKey string) (appID, credential strin

// GetOrCreateRealm get or create realm by appID, if realm is created, it will connect to yomo zipper with credential.
func GetOrCreateRealm(appID string, credential string) (realm *node) {
log.Debug("get or create realm: %s", appID)
log.Debug("get or create realm", "appID", appID)
res, ok := allRealms.LoadOrStore(appID, &node{
MeshID: os.Getenv("MESH_ID"),
id: appID,
Expand Down Expand Up @@ -156,7 +156,7 @@ func (n *node) ConnectToYoMo(credential string) error {
if err != nil {
log.Error("Read from YoMo error", "err", err, "ctx.Data()", ctx.Data())
}
log.Debug("sig", sig)
log.Debug("got sig", "sig", sig)

if sig.AppID != n.id {
log.Debug("ignore message from other app", "appID", sig.AppID)
Expand All @@ -166,9 +166,9 @@ func (n *node) ConnectToYoMo(credential string) error {
channel := n.FindChannel(sig.Channel)
if channel != nil {
channel.Dispatch(sig)
log.Debug("[\u21CA]\t dispatched to %s", sig.Cid)
log.Debug("[\u21CA] dispatched to", "cid", sig.Cid)
} else {
log.Debug("[\u21CA]\t dispatch to channel failed cause of not exist: %s", sig.Channel)
log.Debug("[\u21CA] dispatch to channel failed cause of not exist", "channel", sig.Channel)
}
}

Expand All @@ -193,7 +193,7 @@ func (n *node) ConnectToYoMo(credential string) error {
// BroadcastToYoMo broadcast presence to yomo
func (n *node) BroadcastToYoMo(sig *psig.Signalling) {
// sig.Sid is sender's sid when sending message
log.Debug("\033[34m[%s][\u21C8\u21C8]\t %s\033[36m", sig.AppID, sig)
log.Debug("[\u21C8\u21C8]", "appID", sig.AppID, "sig", sig)
buf, err := msgpack.Marshal(sig)
if err != nil {
log.Error("msgpack marshal: %+v", err)
Expand Down
5 changes: 5 additions & 0 deletions chirp/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func (c *MockConnection) Write(msg []byte) error {
return nil
}

// RawWrite write the raw bytes to the connection, this is a low-level implementation
func (c *MockConnection) RawWrite(byf []byte) (int, error) {
return 0, nil
}

// SenderMock implement yomo.Source interface
type SenderMock struct{}

Expand Down
2 changes: 1 addition & 1 deletion chirp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (p *Peer) HandleSignal(r io.Reader) error {

// p.Sid is the id of connection, set by backend.
sig.Sid = p.Sid
log.Debug("\t>RCV", "sid", p.Sid, "sig", sig)
log.Debug("[>RCV]", "sid", p.Sid, "sig", sig)

if sig.Type == psig.SigControl {
// handle the Control Signalling
Expand Down
3 changes: 2 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func StartServer() {

// DEBUG env indicates development mode, verbose log
if os.Getenv("DEBUG") == "true" {
log.SetLogLevel(util.DEBUG)
// log.SetLogLevel(util.DEBUG)
log.SetLogLevel(-4)
log.Debug("IN DEVELOPMENT ENV")
}

Expand Down
2 changes: 1 addition & 1 deletion tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func loadTLS(certFile, keyFile string) (*tls.Config, error) {

// Get the expiration date
expirationDate := parsedCert.NotAfter
log.Debug("check TLS cert expiration date: %s", expirationDate)
log.Debug("check TLS cert expiration date", "date", expirationDate)

// determine if the certificate is expired
if time.Now().After(expirationDate) {
Expand Down
11 changes: 8 additions & 3 deletions util/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ type logLevelType int

const (
// DEBUG level
DEBUG logLevelType = iota
DEBUG logLevelType = -4
// INFO level
INFO
INFO logLevelType = 0
// ERROR level
ERROR
ERROR logLevelType = 8
)

type plog struct {
Expand Down Expand Up @@ -72,6 +72,11 @@ func (l *plog) Fatal(err error) {
// SetLogLevel set log level.
func (l *plog) SetLogLevel(lvl logLevelType) {
l.logLevel = lvl
if lvl == DEBUG {
l.outlog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
}
}

// Log is a global logger
Expand Down
11 changes: 7 additions & 4 deletions websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -48,7 +49,7 @@ func ListenAndServe(addr string, config *tls.Config) {
// TCP has new connection
conn, err := ln.Accept()
if err != nil {
log.Error("ln.accept error: %s", err)
log.Error("ln.accept error", "err", err)
conn.Close()
continue
}
Expand Down Expand Up @@ -161,7 +162,6 @@ func ListenAndServe(addr string, config *tls.Config) {

keepaliveDone := make(chan bool)
go func(c net.Conn) {
// 浏览器不会发送 Ping,一定是服务器端发 Ping,浏览器会自动回应 Pong(但在 DevTools 里是不显示Ping/Pong的)
// according to https://tools.ietf.org/html/rfc6455#section-5.5.2, Web Browsers will not send Ping frame,
// backend server should send Ping frame to keep connection alive, and Web Browsers will auto reply Pong frame when receive Ping frame. But in Chrome DevTools, Ping/Pong frame is not shown.
ticker := time.NewTicker(DurationOfPing)
Expand All @@ -172,7 +172,8 @@ func ListenAndServe(addr string, config *tls.Config) {
log.Debug("ticker done", "sid", peer.Sid)
return
case <-ticker.C:
c.Write(generatePingFrame())
// c.Write(generatePingFrame())
pconn.RawWrite(generatePingFrame())
}
}
}(conn)
Expand Down Expand Up @@ -254,6 +255,7 @@ func generatePingFrame() []byte {
tsbuf := make([]byte, 8)
binary.BigEndian.PutUint64(tsbuf, uint64(ts))
pf := ws.MustCompileFrame(ws.NewPingFrame(tsbuf))
log.Debug("PING Payload", "len", len(pf), "bytes", fmt.Sprintf("% X", pf))
return pf
}

Expand All @@ -269,7 +271,8 @@ func handlePongFrame(sid string, r io.Reader, header ws.Header) error {
// calculate the RTT and prints to stdout
appData := int64(binary.BigEndian.Uint64(buf))
now := time.Now().UnixMilli()
log.Inspect("\tPONG Payload", "sid", sid, "len", len(buf), "val", appData, "𝚫", now-appData)
// log.Inspect("\tPONG Payload", "sid", sid, "len", len(buf), "val", appData, "𝚫", now-appData)
log.Debug("[PONG]", "sid", sid, "len", len(buf), "buf", fmt.Sprintf("% X", buf), "val", appData, "𝚫", now-appData)
return nil
}

Expand Down