Skip to content

Commit

Permalink
Refactor the tso service
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Aug 20, 2020
1 parent a0eba9b commit a94b3da
Show file tree
Hide file tree
Showing 90 changed files with 520 additions and 299 deletions.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"go.uber.org/zap"
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/autoscaling"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/logutil"
Expand Down
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/pingcap-incubator/tidb-dashboard v0.0.0-20200807020752-01f0abe88e93
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9
github.com/pingcap/errors v0.11.5-0.20200729012136-4e113ddee29e
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d
github.com/pingcap/kvproto v0.0.0-20200810113304-6157337686b1
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad
Expand All @@ -54,5 +54,3 @@ require (
)

replace go.etcd.io/bbolt => go.etcd.io/bbolt v1.3.5

replace github.com/pingcap/errors => github.com/pingcap/errors v0.11.5-0.20200812093836-57ec461934ff
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,10 @@ github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12 h1:rfD9v3+ppLPzoQBgZ
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12/go.mod h1:PYMCGwN0JHjoqGr3HrZoD+b8Tgx8bKnArhSq8YVzUMc=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9 h1:KH4f4Si9XK6/IW50HtoaiLIFHGkapOM6w83za47UYik=
github.com/pingcap/errcode v0.0.0-20180921232412-a1a7271709d9/go.mod h1:4b2X8xSqxIroj/IZ9MX/VGZhAwc11wB9wRIzHvz6SeM=
github.com/pingcap/errors v0.11.5-0.20200812093836-57ec461934ff h1:5MCSOM1ydTR9tXY2IrdWODUrnDdSjb1yluNHDO1sVN4=
github.com/pingcap/errors v0.11.5-0.20200812093836-57ec461934ff/go.mod h1:g4vx//d6VakjJ0mk7iLBlKA8LFavV/sAVINT/1PFxeQ=
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd h1:ay+wAVWHI/Z6vIik13hsK+FT9ZCNSPBElGr0qgiZpjg=
github.com/pingcap/errors v0.11.5-0.20200820035142-66eb5bf1d1cd/go.mod h1:g4vx//d6VakjJ0mk7iLBlKA8LFavV/sAVINT/1PFxeQ=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d h1:F8vp38kTAckN+v8Jlc98uMBvKIzr1a+UhnLyVYn8Q5Q=
github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiutil/apiutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"strconv"

"github.com/pingcap/errcode"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/unrolled/render"
)

Expand Down
46 changes: 24 additions & 22 deletions pkg/autoscaling/calculation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,16 @@ var (

func calculate(rc *cluster.RaftCluster, strategy *Strategy) []*Plan {
var plans []*Plan
var afterQuota uint64
if tikvPlans, tikvAfterQuota := getPlans(rc, strategy, TiKV); tikvPlans != nil {
if tikvPlans := getPlans(rc, strategy, TiKV); tikvPlans != nil {
plans = append(plans, tikvPlans...)
afterQuota += tikvAfterQuota
}
if tidbPlans, tidbAfterQuota := getPlans(rc, strategy, TiDB); tidbPlans != nil {
if tidbPlans := getPlans(rc, strategy, TiDB); tidbPlans != nil {
plans = append(plans, tidbPlans...)
afterQuota += tidbAfterQuota
}
if exceedMaxCPUQuota(strategy, afterQuota) {
return nil
}
return plans
}

func getPlans(rc *cluster.RaftCluster, strategy *Strategy, component ComponentType) ([]*Plan, uint64) {
func getPlans(rc *cluster.RaftCluster, strategy *Strategy, component ComponentType) []*Plan {
var instances []instance
if component == TiKV {
instances = filterTiKVInstances(rc)
Expand All @@ -67,7 +61,7 @@ func getPlans(rc *cluster.RaftCluster, strategy *Strategy, component ComponentTy
}

if len(instances) == 0 {
return nil, 0
return nil
}

totalCPUUseTime := getTotalCPUUseTime(component, instances, MetricsTimeDuration)
Expand All @@ -83,9 +77,9 @@ func getPlans(rc *cluster.RaftCluster, strategy *Strategy, component ComponentTy
}
if usage < minThreshold {
scaleInQuota := (totalCPUTime*minThreshold - totalCPUUseTime) / MetricsTimeDuration.Seconds()
return calculateScaleInPlan(rc, strategy, component, scaleInQuota, instances), 0
return calculateScaleInPlan(rc, strategy, component, scaleInQuota, instances)
}
return nil, 0
return nil
}

func filterTiKVInstances(informer core.StoreSetInformer) []instance {
Expand Down Expand Up @@ -141,28 +135,31 @@ func getResourcesByComponent(strategy *Strategy, component ComponentType) []*Res
return resources
}

func calculateScaleOutPlan(rc *cluster.RaftCluster, strategy *Strategy, component ComponentType, scaleOutQuota float64, currentQuota uint64, instances []instance) ([]*Plan, uint64) {
func calculateScaleOutPlan(rc *cluster.RaftCluster, strategy *Strategy, component ComponentType, scaleOutQuota float64, currentQuota uint64, instances []instance) []*Plan {
groups := getScaledGroupsByComponent(rc, component, instances)
group := findBestGroupToScaleOut(rc, strategy, scaleOutQuota, groups, component)

resCPU := float64(getCPUByResourceType(strategy, group.ResourceType))
resCount := getCountByResourceType(strategy, group.ResourceType)
scaleOutCount := typeutil.MinUint64(uint64(math.Ceil(scaleOutQuota/resCPU)), MaxScaleOutStep)
increasedQuota := getCPUByResourceType(strategy, group.ResourceType) * scaleOutCount
afterQuota := currentQuota + increasedQuota

// A new group created
if len(groups) == 0 {
return []*Plan{&group}, afterQuota
if group.Count+scaleOutCount <= resCount {
group.Count += scaleOutCount
return []*Plan{&group}
}
return nil
}

// update the existed group
for i, g := range groups {
if g.ResourceType == group.ResourceType {
if g.ResourceType == group.ResourceType && group.Count+scaleOutCount <= resCount {
group.Count += scaleOutCount
groups[i] = &group
}
}
return groups, afterQuota
return groups
}

func calculateScaleInPlan(rc *cluster.RaftCluster, strategy *Strategy, component ComponentType, scaleInQuota float64, instances []instance) []*Plan {
Expand All @@ -186,10 +183,6 @@ func calculateScaleInPlan(rc *cluster.RaftCluster, strategy *Strategy, component
return groups
}

func exceedMaxCPUQuota(strategy *Strategy, totalCPUQuota uint64) bool {
return totalCPUQuota > strategy.MaxCPUQuota
}

func getCPUByResourceType(strategy *Strategy, resourceType string) uint64 {
for _, res := range strategy.Resources {
if res.ResourceType == resourceType {
Expand All @@ -199,6 +192,15 @@ func getCPUByResourceType(strategy *Strategy, resourceType string) uint64 {
return 0
}

func getCountByResourceType(strategy *Strategy, resourceType string) uint64 {
for _, res := range strategy.Resources {
if res.ResourceType == resourceType {
return res.Count
}
}
return 0
}

// TODO: get the scaled groups
func getScaledGroupsByComponent(rc *cluster.RaftCluster, component ComponentType, healthyInstances []instance) []*Plan {
switch component {
Expand Down
2 changes: 1 addition & 1 deletion pkg/autoscaling/datasource/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
promClient "github.com/prometheus/client_golang/api"
promAPI "github.com/prometheus/client_golang/api/prometheus/v1"
promModel "github.com/prometheus/common/model"
Expand Down
14 changes: 5 additions & 9 deletions pkg/autoscaling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@ import (

// Strategy within a HTTP request provides rules and resources to help make decision for auto scaling.
type Strategy struct {
// The basic unit of MaxCPUQuota is milli-core.
MaxCPUQuota uint64 `json:"max_cpu_quota"`
Rules []*Rule `json:"rules"`
Resources []*Resource `json:"resources"`
Rules []*Rule `json:"rules"`
Resources []*Resource `json:"resources"`
}

// Rule is a set of constraints for a kind of component.
type Rule struct {
Component string `json:"component"`
CPURule *CPURule `json:"cpu_rule,omitempty"`
StorageRule *StorageRule `json:"storage_rule,omitempty"`
ScaleOutIntervalSeconds uint64 `json:"scale_out_interval_seconds"`
ScaleInIntervalSeconds uint64 `json:"scale_in_interval_seconds"`
Component string `json:"component"`
CPURule *CPURule `json:"cpu_rule,omitempty"`
StorageRule *StorageRule `json:"storage_rule,omitempty"`
}

// CPURule is the constraints about CPU.
Expand Down
2 changes: 1 addition & 1 deletion pkg/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"bytes"
"encoding/binary"

"github.com/pkg/errors"
"github.com/pingcap/errors"
)

var (
Expand Down
24 changes: 24 additions & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,27 @@ var (
ErrReadHTTPBody = errors.Normalize("read HTTP body failed", errors.RFCCodeText("PD:apiutil:ErrReadHTTPBody"))
ErrWriteHTTPBody = errors.Normalize("write HTTP body failed", errors.RFCCodeText("PD:apiutil:ErrWriteHTTPBody"))
)

// cluster errors
var (
ErrPersistStore = errors.Normalize("failed to persist store", errors.RFCCodeText("PD:cluster:ErrPersistStore"))
ErrDeleteRegion = errors.Normalize("failed to delete region from storage", errors.RFCCodeText("PD:cluster:ErrDeleteRegion"))
ErrSaveRegion = errors.Normalize("failed to save region from storage", errors.RFCCodeText("PD:cluster:ErrSaveRegion"))
ErrBuryStore = errors.Normalize("failed to bury store", errors.RFCCodeText("PD:cluster:ErrBuryStore"))
ErrDeleteStore = errors.Normalize("failed to delete store", errors.RFCCodeText("PD:cluster:ErrDeleteStore"))
ErrPersistClusterVersion = errors.Normalize("persist cluster version meet error", errors.RFCCodeText("PD:cluster:ErrPersistClusterVersion"))
ErrGetMembers = errors.Normalize("get members failed", errors.RFCCodeText("PD:cluster:ErrGetMembers"))
// TODO: ErrNewHTTPRequest may not be suitable to put in cluster category
ErrNewHTTPRequest = errors.Normalize("new HTTP request failed", errors.RFCCodeText("PD:cluster:ErrNewHTTPRequest"))
)

// metricutil errors
var (
ErrPushGateway = errors.Normalize("push metrics to gateway failed", errors.RFCCodeText("PD:metricutil:ErrPushGateway"))
)

// etcdutil errors
var (
ErrLoadValue = errors.Normalize("load value from etcd failed", errors.RFCCodeText("PD:etcdutil:ErrLoadValue"))
ErrGetCluster = errors.Normalize("get cluster from remote peer failed", errors.RFCCodeText("PD:etcdutil:ErrGetCluster"))
)
7 changes: 4 additions & 3 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/pkg/types"
Expand Down Expand Up @@ -61,7 +62,7 @@ func CheckClusterID(localClusterID types.ID, um types.URLsMap, tlsConfig *tls.Co
trp.CloseIdleConnections()
if gerr != nil {
// Do not return error, because other members may be not ready.
log.Error("failed to get cluster from remote", zap.Error(gerr))
log.Error("failed to get cluster from remote", errs.ZapError(errs.ErrGetCluster, gerr))
continue
}

Expand Down Expand Up @@ -105,7 +106,7 @@ func EtcdKVGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clie
start := time.Now()
resp, err := clientv3.NewKV(c).Get(ctx, key, opts...)
if err != nil {
log.Error("load from etcd meet error", zap.Error(err))
log.Error("load from etcd meet error", errs.ZapError(errs.ErrLoadValue, err))
}
if cost := time.Since(start); cost > DefaultSlowRequestTime {
log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"crypto/tls"
"net/url"

"github.com/pkg/errors"
"github.com/pingcap/errors"
"go.etcd.io/etcd/pkg/transport"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand Down
2 changes: 1 addition & 1 deletion pkg/logutil/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"sync"

"github.com/coreos/pkg/capnslog"
"github.com/pingcap/errors"
zaplog "github.com/pingcap/log"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.etcd.io/etcd/raft"
"go.uber.org/zap"
Expand Down
4 changes: 2 additions & 2 deletions pkg/metricutil/metricutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"go.uber.org/zap"
)

const zeroDuration = time.Duration(0)
Expand Down Expand Up @@ -68,7 +68,7 @@ func prometheusPushClient(job, addr string, interval time.Duration) {
for {
err := pusher.Push()
if err != nil {
log.Error("could not push metrics to Prometheus Pushgateway", zap.Error(err))
log.Error("could not push metrics to Prometheus Pushgateway", errs.ZapError(errs.ErrPushGateway, err))
}

time.Sleep(interval)
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/convension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package typeutil
import (
"encoding/binary"

"github.com/pkg/errors"
"github.com/pingcap/errors"
)

// BytesToUint64 converts a byte slice to uint64.
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"strconv"
"time"

"github.com/pkg/errors"
"github.com/pingcap/errors"
)

// Duration is a wrapper of time.Duration for TOML and JSON.
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"strconv"

"github.com/docker/go-units"
"github.com/pkg/errors"
"github.com/pingcap/errors"
)

// ByteSize is a retype uint64 for TOML and JSON.
Expand Down
2 changes: 1 addition & 1 deletion pkg/typeutil/string_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/pingcap/errors"
)

// StringSlice is more friendly to json encode/decode
Expand Down
2 changes: 1 addition & 1 deletion plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"sync"

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
Expand Down
2 changes: 1 addition & 1 deletion server/api/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package api

import (
"errors"
"net/http"

"github.com/gorilla/mux"
"github.com/pingcap/errcode"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
Expand Down
2 changes: 1 addition & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"strings"

"github.com/pingcap/errcode"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/apiutil"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/server"
Expand Down
2 changes: 1 addition & 1 deletion server/api/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"net/http"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pkg/errors"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
)
Expand Down
2 changes: 1 addition & 1 deletion server/api/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"regexp"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pkg/errors"
"github.com/tikv/pd/server"
"github.com/unrolled/render"
)
Expand Down
Loading

0 comments on commit a94b3da

Please sign in to comment.