Skip to content

Commit

Permalink
metrics, config: remove pushing metrics (#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Dec 28, 2023
1 parent 0bae3ad commit d7c02ee
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 189 deletions.
6 changes: 0 additions & 6 deletions conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,6 @@ graceful-close-conn-timeout = 15

# require-backend-tls = false

[metrics]

# WARNING: know what you are doing, these two are for debugging.
# metrics-addr = ""
# metrics-interval = 0

[advance]

# ignore-wrong-namespace = true
6 changes: 0 additions & 6 deletions lib/config/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,9 @@ type Config struct {
Advance Advance `yaml:"advance,omitempty" toml:"advance,omitempty" json:"advance,omitempty"`
Workdir string `yaml:"workdir,omitempty" toml:"workdir,omitempty" json:"workdir,omitempty"`
Security Security `yaml:"security,omitempty" toml:"security,omitempty" json:"security,omitempty"`
Metrics Metrics `yaml:"metrics,omitempty" toml:"metrics,omitempty" json:"metrics,omitempty"`
Log Log `yaml:"log,omitempty" toml:"log,omitempty" json:"log,omitempty"`
}

type Metrics struct {
MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"`
MetricsInterval uint `toml:"metrics-interval" json:"metrics-interval"`
}

type KeepAlive struct {
Enabled bool `yaml:"enabled,omitempty" toml:"enabled,omitempty" json:"enabled,omitempty"`
// Idle, Cnt, and Intvl works only when the connection is idle. User packets will interrupt keep-alive.
Expand Down
4 changes: 0 additions & 4 deletions lib/config/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ var testProxyConfig = Config{
API: API{
Addr: "0.0.0.0:3080",
},
Metrics: Metrics{
MetricsAddr: "127.0.0.1:9021",
MetricsInterval: 15,
},
Log: Log{
Encoder: "tidb",
LogOnline: LogOnline{
Expand Down
16 changes: 8 additions & 8 deletions pkg/manager/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,24 @@ func TestConfigReload(t *testing.T) {
},
{
name: "override empty fields",
precfg: `metrics.metrics-addr = ""`,
precfg: `api.addr = ""`,
precheck: func(c *config.Config) bool {
return c.Metrics.MetricsAddr == ""
return c.API.Addr == ""
},
postcfg: `metrics.metrics-addr = "gg"`,
postcfg: `api.addr = "0.0.0.0:3080"`,
postcheck: func(c *config.Config) bool {
return c.Metrics.MetricsAddr == "gg"
return c.API.Addr == "0.0.0.0:3080"
},
},
{
name: "override non-empty fields",
precfg: `metrics.metrics-addr = "ee"`,
precfg: `api.addr = "0.0.0.0:3080"`,
precheck: func(c *config.Config) bool {
return c.Metrics.MetricsAddr == "ee"
return c.API.Addr == "0.0.0.0:3080"
},
postcfg: `metrics.metrics-addr = "gg"`,
postcfg: `api.addr = "0.0.0.0:3081"`,
postcheck: func(c *config.Config) bool {
return c.Metrics.MetricsAddr == "gg"
return c.API.Addr == "0.0.0.0:3081"
},
},
{
Expand Down
79 changes: 1 addition & 78 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,14 @@ package metrics

import (
"context"
"fmt"
"net"
"os"
"runtime"
"sync"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/systimemon"
"github.com/pingcap/tiproxy/lib/util/waitgroup"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/push"
dto "github.com/prometheus/client_model/go"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -53,12 +48,11 @@ func NewMetricsManager() *MetricsManager {
var registerOnce = &sync.Once{}

// Init registers metrics and pushes metrics to prometheus.
func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger, proxyAddr string, cfg config.Metrics, cfgch <-chan *config.Config) {
func (mm *MetricsManager) Init(ctx context.Context, logger *zap.Logger) {
mm.logger = logger
registerOnce.Do(registerProxyMetrics)
ctx, mm.cancel = context.WithCancel(ctx)
mm.setupMonitor(ctx)
mm.pushMetric(ctx, proxyAddr, cfg, cfgch)
}

// Close stops all goroutines.
Expand Down Expand Up @@ -89,65 +83,6 @@ func (mm *MetricsManager) setupMonitor(ctx context.Context) {
})
}

// pushMetric pushes metrics in background.
func (mm *MetricsManager) pushMetric(ctx context.Context, proxyAddr string, cfg config.Metrics, cfgch <-chan *config.Config) {
mm.wg.Run(func() {
proxyInstance := instanceName(proxyAddr)
addr := cfg.MetricsAddr
interval := time.Duration(cfg.MetricsInterval) * time.Second
pusher := mm.buildPusher(addr, interval, proxyInstance)

for ctx.Err() == nil {
select {
case newCfg := <-cfgch:
if newCfg == nil {
return
}
interval = time.Duration(newCfg.Metrics.MetricsInterval) * time.Second
if addr != newCfg.Metrics.MetricsAddr {
addr = newCfg.Metrics.MetricsAddr
pusher = mm.buildPusher(addr, interval, proxyInstance)
}
default:
}

// Wait until the config is legal.
if interval == 0 || pusher == nil {
select {
case <-time.After(time.Second):
continue
case <-ctx.Done():
return
}
}

if err := pusher.Push(); err != nil {
mm.logger.Error("could not push metrics to prometheus pushgateway", zap.Error(err))
}
select {
case <-time.After(interval):
case <-ctx.Done():
return
}
}
})
}

func (mm *MetricsManager) buildPusher(addr string, interval time.Duration, proxyInstance string) *push.Pusher {
var pusher *push.Pusher
if len(addr) > 0 {
// Create a new pusher when the address changes.
mm.logger.Info("start prometheus push client", zap.String("server addr", addr), zap.Stringer("interval", interval))
pusher = push.New(addr, "tiproxy")
pusher = pusher.Gatherer(prometheus.DefaultGatherer)
pusher = pusher.Grouping("instance", proxyInstance)
} else {
mm.logger.Info("disable prometheus push client")
pusher = nil
}
return pusher
}

// registerProxyMetrics registers metrics.
func registerProxyMetrics() {
prometheus.DefaultRegisterer.Unregister(collectors.NewGoCollector())
Expand All @@ -173,18 +108,6 @@ func registerProxyMetrics() {
prometheus.MustRegister(MigrateDurationHistogram)
}

func instanceName(proxyAddr string) string {
hostname, err := os.Hostname()
if err != nil {
return "unknown"
}
_, port, err := net.SplitHostPort(proxyAddr)
if err != nil {
return "unknown"
}
return fmt.Sprintf("%s_%s", hostname, port)
}

// ReadCounter reads the value from the counter. It is only used for testing.
func ReadCounter(counter prometheus.Counter) (int, error) {
var metric dto.Metric
Expand Down
87 changes: 2 additions & 85 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,97 +5,14 @@ package metrics

import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/lib/util/logger"
"github.com/stretchr/testify/require"
)

// Test that the metrics are pushed or not pushed with different configurations.
func TestPushMetrics(t *testing.T) {
proxyAddr := "0.0.0.0:6000"
labelName := fmt.Sprintf("%s_%s_maxprocs", ModuleProxy, LabelServer)
bodyCh1, bodyCh2 := make(chan string), make(chan string)
pgwOK1, pgwOK2 := setupServer(t, bodyCh1), setupServer(t, bodyCh2)
func TestStartMetricsManager(t *testing.T) {
log, _ := logger.CreateLoggerForTest(t)

tests := []struct {
metricsAddr string
metricsInterval uint
pushedCh chan string
}{
{
metricsAddr: pgwOK1.URL,
metricsInterval: 1,
pushedCh: bodyCh1,
},
{
metricsAddr: pgwOK1.URL,
metricsInterval: 0,
pushedCh: nil,
},
{
metricsAddr: pgwOK2.URL,
metricsInterval: 1,
pushedCh: bodyCh2,
},
{
metricsAddr: "",
metricsInterval: 1,
pushedCh: nil,
},
}
mm := NewMetricsManager()
cfgCh := make(chan *config.Config, 1)
mm.Init(context.Background(), log, proxyAddr, config.Metrics{}, cfgCh)
for _, tt := range tests {
cfgCh <- &config.Config{
Metrics: config.Metrics{
MetricsAddr: tt.metricsAddr,
MetricsInterval: tt.metricsInterval,
},
}
if tt.pushedCh != nil {
select {
case body := <-tt.pushedCh:
require.Contains(t, body, labelName)
case <-time.After(2 * time.Second):
t.Fatal("not pushed")
}
} else {
select {
case <-bodyCh1:
t.Fatal("pushed 1")
case <-bodyCh2:
t.Fatal("pushed 2")
case <-time.After(2 * time.Second):
}
}
}
mm.Init(context.Background(), log)
mm.Close()
}

func setupServer(t *testing.T, bodyCh chan string) *httptest.Server {
hostname, err := os.Hostname()
require.NoError(t, err)
expectedPath := fmt.Sprintf("/metrics/job/tiproxy/instance/%s_6000", hostname)
server := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyCh <- string(body)
require.Equal(t, expectedPath, r.URL.EscapedPath())
w.Header().Set("Content-Type", `text/plain; charset=utf-8`)
w.WriteHeader(http.StatusOK)
}),
)
t.Cleanup(server.Close)
return server
}
2 changes: 1 addition & 1 deletion pkg/server/api/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ max-backups = 3
doHTTP(t, http.MethodGet, "/api/admin/config?format=json", nil, func(t *testing.T, r *http.Response) {
all, err := io.ReadAll(r.Body)
require.NoError(t, err)
require.Equal(t, `{"proxy":{"addr":"0.0.0.0:6000","pd-addrs":"127.0.0.1:2379","frontend-keepalive":{"enabled":true},"backend-healthy-keepalive":{"enabled":true,"idle":60000000000,"cnt":5,"intvl":3000000000,"timeout":15000000000},"backend-unhealthy-keepalive":{"enabled":true,"idle":10000000000,"cnt":5,"intvl":1000000000,"timeout":5000000000},"graceful-close-conn-timeout":15},"api":{"addr":"0.0.0.0:3080"},"advance":{"ignore-wrong-namespace":true},"security":{"server-tls":{"min-tls-version":"1.1"},"server-http-tls":{"min-tls-version":"1.1"},"cluster-tls":{"min-tls-version":"1.1"},"sql-tls":{"min-tls-version":"1.1"}},"metrics":{"metrics-addr":"","metrics-interval":0},"log":{"encoder":"tidb","level":"info","log-file":{"max-size":300,"max-days":3,"max-backups":3}}}`,
require.Equal(t, `{"proxy":{"addr":"0.0.0.0:6000","pd-addrs":"127.0.0.1:2379","frontend-keepalive":{"enabled":true},"backend-healthy-keepalive":{"enabled":true,"idle":60000000000,"cnt":5,"intvl":3000000000,"timeout":15000000000},"backend-unhealthy-keepalive":{"enabled":true,"idle":10000000000,"cnt":5,"intvl":1000000000,"timeout":5000000000},"graceful-close-conn-timeout":15},"api":{"addr":"0.0.0.0:3080"},"advance":{"ignore-wrong-namespace":true},"security":{"server-tls":{"min-tls-version":"1.1"},"server-http-tls":{"min-tls-version":"1.1"},"cluster-tls":{"min-tls-version":"1.1"},"sql-tls":{"min-tls-version":"1.1"}},"log":{"encoder":"tidb","level":"info","log-file":{"max-size":300,"max-days":3,"max-backups":3}}}`,
string(regexp.MustCompile(`"workdir":"[^"]+",`).ReplaceAll(all, nil)))
require.Equal(t, http.StatusOK, r.StatusCode)
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewServer(ctx context.Context, sctx *sctx.Context) (srv *Server, err error)
cfg := srv.ConfigManager.GetConfig()

// setup metrics
srv.MetricsManager.Init(ctx, lg.Named("metrics"), cfg.Proxy.Addr, cfg.Metrics, srv.ConfigManager.WatchConfig())
srv.MetricsManager.Init(ctx, lg.Named("metrics"))
metrics.ServerEventCounter.WithLabelValues(metrics.EventStart).Inc()

// setup certs
Expand Down

0 comments on commit d7c02ee

Please sign in to comment.