Skip to content

Commit

Permalink
metrics, balance: add metrics for load balance (#556)
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored Jun 13, 2024
1 parent de4a59b commit de5e73a
Show file tree
Hide file tree
Showing 29 changed files with 482 additions and 154 deletions.
8 changes: 8 additions & 0 deletions lib/config/getter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright 2024 PingCAP, Inc.
// SPDX-License-Identifier: Apache-2.0

package config

type ConfigGetter interface {
GetConfig() *Config
}
9 changes: 5 additions & 4 deletions pkg/balance/factor/factor_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (fbb *FactorBasedBalance) BackendToRoute(backends []policy.BackendCtx) poli
// BackendsToBalance returns the busiest/unhealthy backend and the idlest backend.
// balanceCount: the count of connections to migrate in this round. 0 indicates no need to balance.
// reason: the debug information to be logged.
func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason []zap.Field) {
func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (from, to policy.BackendCtx, balanceCount int, reason string, logFields []zap.Field) {
if len(backends) <= 1 {
return
}
Expand Down Expand Up @@ -226,16 +226,17 @@ func (fbb *FactorBasedBalance) BackendsToBalance(backends []policy.BackendCtx) (
// backend1 factor scores: 1, 0, 1
// backend2 factor scores: 0, 1, 0
// Balancing the third factor may make the second factor unbalanced, although it's in the same order with the first factor.
return nil, nil, 0, nil
return
}
leftBitNum -= bitNum
}
reason = factor.Name()
fields := []zap.Field{
zap.String("factor", factor.Name()),
zap.String("factor", reason),
zap.Uint64("from_score", maxScore),
zap.Uint64("to_score", minScore),
}
return busiestBackend.BackendCtx, idlestBackend.BackendCtx, balanceCount, fields
return busiestBackend.BackendCtx, idlestBackend.BackendCtx, balanceCount, reason, fields
}

func (fbb *FactorBasedBalance) SetConfig(cfg *config.Config) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/balance/factor/factor_balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,12 @@ func TestBalanceWithOneFactor(t *testing.T) {
}
}
backends := createBackends(len(test.scores))
from, to, count, _ := fm.BackendsToBalance(backends)
from, to, count, reason, _ := fm.BackendsToBalance(backends)
require.Equal(t, test.count, count, "test index %d", tIdx)
if test.count > 0 {
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
require.Equal(t, backends[test.toIdx], to, "test index %d", tIdx)
require.Equal(t, "mock", reason, "test index %d", tIdx)
} else {
require.Nil(t, from, "test index %d", tIdx)
require.Nil(t, to, "test index %d", tIdx)
Expand Down Expand Up @@ -215,7 +216,7 @@ func TestBalanceWith2Factors(t *testing.T) {
}
}
backends := createBackends(len(test.scores1))
from, to, count, _ := fm.BackendsToBalance(backends)
from, to, count, _, _ := fm.BackendsToBalance(backends)
require.Equal(t, test.count, count, "test index %d", tIdx)
if test.count > 0 {
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
Expand Down Expand Up @@ -266,7 +267,7 @@ func TestBalanceWith3Factors(t *testing.T) {
}(factorIdx, factor)
}
backends := createBackends(len(test.scores))
from, to, count, _ := fm.BackendsToBalance(backends)
from, to, count, _, _ := fm.BackendsToBalance(backends)
require.Equal(t, test.count, count, "test index %d", tIdx)
if test.count > 0 {
require.Equal(t, backends[test.fromIdx], from, "test index %d", tIdx)
Expand Down
15 changes: 3 additions & 12 deletions pkg/balance/factor/factor_location.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@ package factor
import "github.com/pingcap/tiproxy/lib/config"

const (
// locationLabelName indicates the label name that location-based balance should be based on.
// We use `zone` because the follower read in TiDB also uses `zone` to decide location.
locationLabelName = "zone"
// balanceCount4Location indicates how many connections to balance per second.
balanceCount4Location = 1
)

var _ Factor = (*FactorLabel)(nil)

type FactorLocation struct {
// The location of this tiproxy instance.
selfLocation string
bitNum int
bitNum int
}

func NewFactorLocation() *FactorLocation {
Expand All @@ -32,13 +27,12 @@ func (fl *FactorLocation) Name() string {
}

func (fl *FactorLocation) UpdateScore(backends []scoredBackend) {
if len(fl.selfLocation) == 0 || len(backends) <= 1 {
if len(backends) <= 1 {
return
}
for i := 0; i < len(backends); i++ {
score := 1
backendLabels := backends[i].GetBackendInfo().Labels
if backendLabels != nil && backendLabels[locationLabelName] == fl.selfLocation {
if backends[i].Local() {
score = 0
}
backends[i].addScore(score, fl.bitNum)
Expand All @@ -54,9 +48,6 @@ func (fl *FactorLocation) BalanceCount(from, to scoredBackend) int {
}

func (fl *FactorLocation) SetConfig(cfg *config.Config) {
if cfg.Labels != nil {
fl.selfLocation = cfg.Labels[locationLabelName]
}
}

func (fl *FactorLocation) Close() {
Expand Down
105 changes: 9 additions & 96 deletions pkg/balance/factor/factor_location_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,120 +6,33 @@ package factor
import (
"testing"

"github.com/pingcap/tiproxy/lib/config"
"github.com/pingcap/tiproxy/pkg/balance/observer"
"github.com/stretchr/testify/require"
)

func TestFactorLocationOneBackend(t *testing.T) {
func TestFactorLocationScore(t *testing.T) {
tests := []struct {
selfLocation string
backendLocation string
expectedScore uint64
}{
{},
{
selfLocation: "az1",
expectedScore: 1,
},
{
backendLocation: "az1",
},
{
selfLocation: "az1",
backendLocation: "az2",
expectedScore: 1,
},
{
selfLocation: "az1",
backendLocation: "az1",
},
}

factor := NewFactorLocation()
for i, test := range tests {
var backendLabels map[string]string
if test.backendLocation != "" {
backendLabels = map[string]string{
locationLabelName: test.backendLocation,
}
}
backendCtx := &mockBackend{
BackendInfo: observer.BackendInfo{Labels: backendLabels},
}
// Create 2 backends so that UpdateScore won't skip calculating scores.
backends := []scoredBackend{
{
BackendCtx: backendCtx,
},
{
BackendCtx: backendCtx,
},
}
var selfLabels map[string]string
if test.selfLocation != "" {
selfLabels = map[string]string{
locationLabelName: test.selfLocation,
}
}
factor.SetConfig(&config.Config{
Labels: selfLabels,
})
factor.UpdateScore(backends)
for _, backend := range backends {
require.Equal(t, test.expectedScore, backend.score(), "test idx: %d", i)
}
}
}

func TestFactorLocationMultiBackends(t *testing.T) {
tests := []struct {
labels map[string]string
local bool
expectedScore uint64
}{
{
local: false,
expectedScore: 1,
},
{
labels: map[string]string{
locationLabelName: "az1",
},
expectedScore: 0,
},
{
labels: map[string]string{
"z": "az1",
},
expectedScore: 1,
},
{
labels: map[string]string{
locationLabelName: "az2",
"z": "az1",
},
expectedScore: 1,
},
{
labels: map[string]string{
locationLabelName: "az1",
"z": "az2",
},
local: true,
expectedScore: 0,
},
}

factor := NewFactorLocation()
backends := make([]scoredBackend, 0, len(tests))
for _, test := range tests {
backend := scoredBackend{
backends = append(backends, scoredBackend{
BackendCtx: &mockBackend{
BackendInfo: observer.BackendInfo{Labels: test.labels},
local: test.local,
},
}
backends = append(backends, backend)
})
}
factor := NewFactorLocation()
factor.SetConfig(&config.Config{
Labels: map[string]string{locationLabelName: "az1"},
})
factor.UpdateScore(backends)
for i, test := range tests {
require.Equal(t, test.expectedScore, backends[i].score(), "test idx: %d", i)
Expand Down
5 changes: 5 additions & 0 deletions pkg/balance/factor/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type mockBackend struct {
connScore int
connCount int
healthy bool
local bool
}

func newMockBackend(healthy bool, connScore int) *mockBackend {
Expand Down Expand Up @@ -53,6 +54,10 @@ func (mb *mockBackend) GetBackendInfo() observer.BackendInfo {
return mb.BackendInfo
}

func (mb *mockBackend) Local() bool {
return mb.local
}

var _ Factor = (*mockFactor)(nil)

type mockFactor struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/balance/metricsreader/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,7 @@ func (mb *mockBackend) Addr() string {
func (mb *mockBackend) GetBackendInfo() observer.BackendInfo {
return mb.BackendInfo
}

func (mb *mockBackend) Local() bool {
return true
}
31 changes: 30 additions & 1 deletion pkg/balance/observer/backend_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,17 @@

package observer

import "fmt"
import (
"fmt"

"github.com/pingcap/tiproxy/lib/config"
)

var (
// locationLabelName indicates the label name that decides the location of TiProxy and backends.
// We use `zone` because the follower read in TiDB also uses `zone` to decide location.
locationLabelName = "zone"
)

type BackendHealth struct {
BackendInfo
Expand All @@ -12,6 +22,25 @@ type BackendHealth struct {
PingErr error
// The backend version that returned to the client during handshake.
ServerVersion string
// Whether the backend in the same zone with TiProxy. If TiProxy location is undefined, take all backends as local.
Local bool
}

func (bh *BackendHealth) setLocal(cfg *config.Config) {
if cfg.Labels == nil {
bh.Local = true
return
}
selfLocation, ok := cfg.Labels[locationLabelName]
if !ok || len(selfLocation) == 0 {
bh.Local = true
return
}
if bh.Labels != nil && bh.Labels[locationLabelName] == selfLocation {
bh.Local = true
return
}
bh.Local = false
}

func (bh *BackendHealth) Equals(health BackendHealth) bool {
Expand Down
8 changes: 6 additions & 2 deletions pkg/balance/observer/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ type DefaultBackendObserver struct {
refreshChan chan struct{}
fetcher BackendFetcher
hc HealthCheck
cfgGetter config.ConfigGetter
cancelFunc context.CancelFunc
logger *zap.Logger
healthCheckConfig *config.HealthCheck
wgp *waitgroup.WaitGroupPool
}

// NewDefaultBackendObserver creates a BackendObserver.
func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck,
backendFetcher BackendFetcher, hc HealthCheck) *DefaultBackendObserver {
func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck, backendFetcher BackendFetcher, hc HealthCheck,
cfgGetter config.ConfigGetter) *DefaultBackendObserver {
config.Check()
bo := &DefaultBackendObserver{
logger: logger,
Expand All @@ -60,6 +61,7 @@ func NewDefaultBackendObserver(logger *zap.Logger, config *config.HealthCheck,
fetcher: backendFetcher,
subscribers: make(map[string]chan HealthResult),
curBackends: make(map[string]*BackendHealth),
cfgGetter: cfgGetter,
}
return bo
}
Expand Down Expand Up @@ -126,13 +128,15 @@ func (bo *DefaultBackendObserver) checkHealth(ctx context.Context, backends map[

// Each goroutine checks one backend.
var lock sync.Mutex
cfg := bo.cfgGetter.GetConfig()
for addr, info := range backends {
func(addr string, info *BackendInfo) {
bo.wgp.RunWithRecover(func() {
if ctx.Err() != nil {
return
}
health := bo.hc.Check(ctx, addr, info)
health.setLocal(cfg)
lock.Lock()
curBackendHealth[addr] = health
lock.Unlock()
Expand Down
Loading

0 comments on commit de5e73a

Please sign in to comment.