Skip to content

Commit

Permalink
feat: add metrics chart (#366)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Aug 26, 2024
1 parent 7aff42f commit 4363773
Show file tree
Hide file tree
Showing 11 changed files with 556 additions and 244 deletions.
4 changes: 2 additions & 2 deletions cf-workers/ws/.prettierrc → .prettierrc
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"printWidth": 140,
"singleQuote": true,
"semi": true,
"useTabs": true
}
"useTabs": false
}
52 changes: 51 additions & 1 deletion internal/cmgr/cmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ const (
ConnectionTypeClosed = "closed"
)

type QueryNodeMetricsReq struct {
TimeRange string `json:"time_range"` // 15min/30min/1h/6h/12h/24h
Num int `json:"num"` // number of nodes to query
}

// connection manager interface/
// TODO support closed connection
type Cmgr interface {
Expand All @@ -34,17 +39,21 @@ type Cmgr interface {

// Start starts the connection manager.
Start(ctx context.Context, errCH chan error)

QueryNodeMetrics(ctx context.Context, req *QueryNodeMetricsReq) ([]metric_reader.NodeMetrics, error)
}

type cmgrImpl struct {
lock sync.RWMutex
cfg *Config
l *zap.SugaredLogger
mr metric_reader.Reader

// k: relay label, v: connection list
activeConnectionsMap map[string][]conn.RelayConn
closedConnectionsMap map[string][]conn.RelayConn

mr metric_reader.Reader
ms []*metric_reader.NodeMetrics // TODO gc this
}

func NewCmgr(cfg *Config) Cmgr {
Expand Down Expand Up @@ -171,6 +180,12 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
cm.l.Infof("Start Cmgr sync interval=%d", cm.cfg.SyncInterval)
ticker := time.NewTicker(time.Second * time.Duration(cm.cfg.SyncInterval))
defer ticker.Stop()
// sync once at the beginning
if err := cm.syncOnce(ctx); err != nil {
cm.l.Errorf("meet non retry error: %s ,exit now", err)
errCH <- err
return
}

for {
select {
Expand All @@ -185,3 +200,38 @@ func (cm *cmgrImpl) Start(ctx context.Context, errCH chan error) {
}
}
}

func (cm *cmgrImpl) QueryNodeMetrics(ctx context.Context, req *QueryNodeMetricsReq) ([]metric_reader.NodeMetrics, error) {
cm.lock.RLock()
defer cm.lock.RUnlock()

var startTime time.Time
switch req.TimeRange {
case "15min":
startTime = time.Now().Add(-15 * time.Minute)
case "30min":
startTime = time.Now().Add(-30 * time.Minute)
case "1h":
startTime = time.Now().Add(-1 * time.Hour)
case "6h":
startTime = time.Now().Add(-6 * time.Hour)
case "12h":
startTime = time.Now().Add(-12 * time.Hour)
case "24h":
startTime = time.Now().Add(-24 * time.Hour)
default:
// default to 15min
startTime = time.Now().Add(-15 * time.Minute)
}

res := []metric_reader.NodeMetrics{}
for _, metrics := range cm.ms {
if metrics.SyncTime.After(startTime) {
res = append(res, *metrics)
}
if req.Num > 0 && len(res) >= req.Num {
break
}
}
return res, nil
}
7 changes: 4 additions & 3 deletions internal/cmgr/metric_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
// todo: opt lock
cm.lock.Lock()

shorCommit := constant.GitRevision
shortCommit := constant.GitRevision
if len(constant.GitRevision) > 7 {
shorCommit = constant.GitRevision[:7]
shortCommit = constant.GitRevision[:7]
}
req := syncReq{
Stats: []StatsPerRule{},
Version: VersionInfo{Version: constant.Version, ShortCommit: shorCommit},
Version: VersionInfo{Version: constant.Version, ShortCommit: shortCommit},
}

if cm.cfg.NeedMetrics() {
Expand All @@ -50,6 +50,7 @@ func (cm *cmgrImpl) syncOnce(ctx context.Context) error {
cm.l.Errorf("read metrics failed: %v", err)
} else {
req.Node = *metrics
cm.ms = append(cm.ms, metrics)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/relay/conf/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (r *Config) Adjust() error {
zap.S().Debugf("label is empty, set default label:%s", r.Label)
}
if len(r.Remotes) == 0 && len(r.TCPRemotes) != 0 {
zap.S().Warnf("tcp remotes is deprecated, use remotes instead")
zap.S().Warnf("tcp remotes is deprecated, please use remotes instead")
r.Remotes = r.TCPRemotes
}

Expand Down
18 changes: 18 additions & 0 deletions internal/web/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"strconv"

"github.com/Ehco1996/ehco/internal/cmgr"
"github.com/Ehco1996/ehco/internal/constant"
"github.com/labstack/echo/v4"
"go.uber.org/zap"
Expand Down Expand Up @@ -120,3 +121,20 @@ func (s *Server) ListRules(c echo.Context) error {
"Configs": s.cfg.RelayConfigs,
})
}

func (s *Server) GetNodeMetrics(c echo.Context) error {
req := &cmgr.QueryNodeMetricsReq{TimeRange: c.QueryParam("time_range")}
num := c.QueryParam("num")
if num != "" {
n, err := strconv.Atoi(num)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
req.Num = n
}
metrics, err := s.connMgr.QueryNodeMetrics(c.Request().Context(), req)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, err.Error())
}
return c.JSON(http.StatusOK, metrics)
}
1 change: 1 addition & 0 deletions internal/web/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func NewServer(
api.GET("/config/", s.CurrentConfig)
api.POST("/config/reload/", s.HandleReload)
api.GET("/health_check/", s.HandleHealthCheck)
api.GET("/node_metrics/", s.GetNodeMetrics)
return s, nil
}

Expand Down
Loading

0 comments on commit 4363773

Please sign in to comment.