Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics, balance: add metrics for load balance #556

Merged
merged 2 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions lib/config/getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package config

type ConfigGetter interface {
GetConfig() *Config
}
9 changes: 5 additions & 4 deletions pkg/balance/factor/factor_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
// BackendsToBalance returns the busiest/unhealthy backend and the idlest backend.
// balanceCount: the count of connections to migrate in this round. 0 indicates no need to balance.
// reason: the debug information to be logged.
func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason []zap.Field) {
func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason string, logFields []zap.Field) {
if len(backends) <= 1 {
return
}
Expand Down Expand Up @@ -226,16 +226,17 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (
// backend1 factor scores: 1, 0, 1
// backend2 factor scores: 0, 1, 0
// Balancing the third factor may make the second factor unbalanced, although it's in the same order with the first factor.
return nil, nil, 0, nil
return
}
leftBitNum -= bitNum
}
reason = factor.Name()
fields := []zap.Field{
zap.String("factor", factor.Name()),
zap.String("factor", reason),
zap.Uint64("from_score", maxScore),
zap.Uint64("to_score", minScore),
}
return busiestBackend.BackendCtx, idlestBackend.BackendCtx, balanceCount, fields
return busiestBackend.BackendCtx, idlestBackend.BackendCtx, balanceCount, reason, fields
}

func (fbb *FactorBasedBalance) SetConfig(cfg *config.Config) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/balance/factor/factor_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ func TestBalanceWithOneFactor(t *testing.T) {
}
}
backends := createBackends(len(test.scores))
from, to, count, _ := fm.BackendsToBalance(backends)
from, to, count, reason, _ := fm.BackendsToBalance(backends)
require.Equal(t, test.count, count, "test index %d", tIdx)
if test.count > 0 {
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
require.Equal(t, backends[test.toIdx], to, "test index %d", tIdx)
require.Equal(t, "mock", reason, "test index %d", tIdx)
} else {
require.Nil(t, from, "test index %d", tIdx)
require.Nil(t, to, "test index %d", tIdx)
Expand Down Expand Up @@ -215,7 +216,7 @@ func TestBalanceWith2Factors(t *testing.T) {
}
}
backends := createBackends(len(test.scores1))
from, to, count, _ := fm.BackendsToBalance(backends)
from, to, count, _, _ := fm.BackendsToBalance(backends)
require.Equal(t, test.count, count, "test index %d", tIdx)
if test.count > 0 {
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
Expand Down Expand Up @@ -266,7 +267,7 @@ func TestBalanceWith3Factors(t *testing.T) {
}(factorIdx, factor)
}
backends := createBackends(len(test.scores))
from, to, count, _ := fm.BackendsToBalance(backends)
from, to, count, _, _ := fm.BackendsToBalance(backends)
require.Equal(t, test.count, count, "test index %d", tIdx)
if test.count > 0 {
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
Expand Down
15 changes: 3 additions & 12 deletions pkg/balance/factor/factor_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@ package factor
import "github.com/pingcap/tiproxy/lib/config"

const (
// locationLabelName indicates the label name that location-based balance should be based on.
// We use `zone` because the follower read in TiDB also uses `zone` to decide location.
locationLabelName = "zone"
// balanceCount4Location indicates how many connections to balance per second.
balanceCount4Location = 1
)

var _ Factor = (*FactorLabel)(nil)

type FactorLocation struct {
// The location of this tiproxy instance.
selfLocation string
bitNum int
bitNum int
}

func NewFactorLocation() *FactorLocation {
Expand All @@ -32,13 +27,12 @@ func (fl *FactorLocation) Name() string {
}

func (fl *FactorLocation) UpdateScore(backends []scoredBackend) {
if len(fl.selfLocation) == 0 || len(backends) <= 1 {
if len(backends) <= 1 {
return
}
for i := 0; i < len(backends); i++ {
score := 1
backendLabels := backends[i].GetBackendInfo().Labels
if backendLabels != nil && backendLabels[locationLabelName] == fl.selfLocation {
if backends[i].Local() {
score = 0
}
backends[i].addScore(score, fl.bitNum)
Expand All @@ -54,9 +48,6 @@ func (fl *FactorLocation) BalanceCount(from, to scoredBackend) int {
}

func (fl *FactorLocation) SetConfig(cfg *config.Config) {
if cfg.Labels != nil {
fl.selfLocation = cfg.Labels[locationLabelName]
}
}

func (fl *FactorLocation) Close() {
Expand Down
105 changes: 9 additions & 96 deletions pkg/balance/factor/factor_location_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,120 +6,33 @@ package factor
import (
"testing"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/pkg/balance/observer"
"github.com/stretchr/testify/require"
)

func TestFactorLocationOneBackend(t *testing.T) {
func TestFactorLocationScore(t *testing.T) {
tests := []struct {
selfLocation string
backendLocation string
expectedScore uint64
}{
{},
{
selfLocation: "az1",
expectedScore: 1,
},
{
backendLocation: "az1",
},
{
selfLocation: "az1",
backendLocation: "az2",
expectedScore: 1,
},
{
selfLocation: "az1",
backendLocation: "az1",
},
}

factor := NewFactorLocation()
for i, test := range tests {
var backendLabels map[string]string
if test.backendLocation != "" {
backendLabels = map[string]string{
locationLabelName: test.backendLocation,
}
}
backendCtx := &mockBackend{
BackendInfo: observer.BackendInfo{Labels: backendLabels},
}
// Create 2 backends so that UpdateScore won't skip calculating scores.
backends := []scoredBackend{
{
BackendCtx: backendCtx,
},
{
BackendCtx: backendCtx,
},
}
var selfLabels map[string]string
if test.selfLocation != "" {
selfLabels = map[string]string{
locationLabelName: test.selfLocation,
}
}
factor.SetConfig(&config.Config{
Labels: selfLabels,
})
factor.UpdateScore(backends)
for _, backend := range backends {
require.Equal(t, test.expectedScore, backend.score(), "test idx: %d", i)
}
}
}

func TestFactorLocationMultiBackends(t *testing.T) {
tests := []struct {
labels map[string]string
local bool
expectedScore uint64
}{
{
local: false,
expectedScore: 1,
},
{
labels: map[string]string{
locationLabelName: "az1",
},
expectedScore: 0,
},
{
labels: map[string]string{
"z": "az1",
},
expectedScore: 1,
},
{
labels: map[string]string{
locationLabelName: "az2",
"z": "az1",
},
expectedScore: 1,
},
{
labels: map[string]string{
locationLabelName: "az1",
"z": "az2",
},
local: true,
expectedScore: 0,
},
}

factor := NewFactorLocation()
backends := make([]scoredBackend, 0, len(tests))
for _, test := range tests {
backend := scoredBackend{
backends = append(backends, scoredBackend{
BackendCtx: &mockBackend{
BackendInfo: observer.BackendInfo{Labels: test.labels},
local: test.local,
},
}
backends = append(backends, backend)
})
}
factor := NewFactorLocation()
factor.SetConfig(&config.Config{
Labels: map[string]string{locationLabelName: "az1"},
})
factor.UpdateScore(backends)
for i, test := range tests {
require.Equal(t, test.expectedScore, backends[i].score(), "test idx: %d", i)
Expand Down
5 changes: 5 additions & 0 deletions pkg/balance/factor/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type mockBackend struct {
connScore int
connCount int
healthy bool
local bool
}

func newMockBackend(healthy bool, connScore int) *mockBackend {
Expand Down Expand Up @@ -53,6 +54,10 @@ func (mb *mockBackend) GetBackendInfo() observer.BackendInfo {
return mb.BackendInfo
}

func (mb *mockBackend) Local() bool {
return mb.local
}

var _ Factor = (*mockFactor)(nil)

type mockFactor struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/balance/metricsreader/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,7 @@ func (mb *mockBackend) Addr() string {
func (mb *mockBackend) GetBackendInfo() observer.BackendInfo {
return mb.BackendInfo
}

func (mb *mockBackend) Local() bool {
return true
}
31 changes: 30 additions & 1 deletion pkg/balance/observer/backend_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@

package observer

import "fmt"
import (
"fmt"

"github.com/pingcap/tiproxy/lib/config"
)

var (
// locationLabelName indicates the label name that decides the location of TiProxy and backends.
// We use `zone` because the follower read in TiDB also uses `zone` to decide location.
locationLabelName = "zone"
)

type BackendHealth struct {
BackendInfo
Expand All @@ -12,6 +22,25 @@ type BackendHealth struct {
PingErr error
// The backend version that returned to the client during handshake.
ServerVersion string
// Whether the backend in the same zone with TiProxy. If TiProxy location is undefined, take all backends as local.
Local bool
}

func (bh *BackendHealth) setLocal(cfg *config.Config) {
if cfg.Labels == nil {
bh.Local = true
return
}
selfLocation, ok := cfg.Labels[locationLabelName]
if !ok || len(selfLocation) == 0 {
bh.Local = true
return
}
if bh.Labels != nil && bh.Labels[locationLabelName] == selfLocation {
bh.Local = true
return
}
bh.Local = false
}

func (bh *BackendHealth) Equals(health BackendHealth) bool {
Expand Down
8 changes: 6 additions & 2 deletions pkg/balance/observer/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ type DefaultBackendObserver struct {
refreshChan chan struct{}
fetcher BackendFetcher
hc HealthCheck
cfgGetter config.ConfigGetter
cancelFunc context.CancelFunc
logger *zap.Logger
healthCheckConfig *config.HealthCheck
wgp *waitgroup.WaitGroupPool
}

// NewDefaultBackendObserver creates a BackendObserver.
func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck,
backendFetcher BackendFetcher, hc HealthCheck) *DefaultBackendObserver {
func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck, backendFetcher BackendFetcher, hc HealthCheck,
cfgGetter config.ConfigGetter) *DefaultBackendObserver {
config.Check()
bo := &DefaultBackendObserver{
logger: logger,
Expand All @@ -60,6 +61,7 @@ func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck,
fetcher: backendFetcher,
subscribers: make(map[string]chan HealthResult),
curBackends: make(map[string]*BackendHealth),
cfgGetter: cfgGetter,
}
return bo
}
Expand Down Expand Up @@ -126,13 +128,15 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[

// Each goroutine checks one backend.
var lock sync.Mutex
cfg := bo.cfgGetter.GetConfig()
for addr, info := range backends {
func(addr string, info *BackendInfo) {
bo.wgp.RunWithRecover(func() {
if ctx.Err() != nil {
return
}
health := bo.hc.Check(ctx, addr, info)
health.setLocal(cfg)
lock.Lock()
curBackendHealth[addr] = health
lock.Unlock()
Expand Down
Loading