Skip to content

Commit

Permalink
Merge branch 'master' into add_ready_to_resp
Browse files Browse the repository at this point in the history
  • Loading branch information
HuSharp authored Aug 24, 2023
2 parents 36e8caf + fbd386a commit 1005871
Show file tree
Hide file tree
Showing 37 changed files with 1,236 additions and 228 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ error = '''
leader is nil
'''

["PD:server:ErrRateLimitExceeded"]
error = '''
rate limit exceeded
'''

["PD:server:ErrServerNotStarted"]
error = '''
server not started
Expand Down
8 changes: 4 additions & 4 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -7091,14 +7091,14 @@
"steppedLine": false,
"targets": [
{
"expr": "-sum(delta(pd_scheduler_balance_leader{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-out\", instance=\"$instance\", type=\"move-leader\"}[1m])) by (store)",
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_leader{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-in\", instance=\"$instance\", type=\"move-leader\"}[1m])) by (store)",
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-leader-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
Expand Down Expand Up @@ -7195,14 +7195,14 @@
"steppedLine": false,
"targets": [
{
"expr": "-sum(delta(pd_scheduler_balance_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-out\", instance=\"$instance\", type=\"move-peer\"}[1m])) by (store)",
"expr": "-sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (source)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
"refId": "A"
},
{
"expr": "sum(delta(pd_scheduler_balance_region{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", store=~\"$store-in\", instance=\"$instance\", type=\"move-peer\"}[1m])) by (store)",
"expr": "sum(delta(pd_scheduler_balance_direction{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\",instance=\"$instance\",type=\"balance-region-scheduler\"}[1m])) by (target)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "store-{{store}}",
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ var (
ErrCancelStartEtcd = errors.Normalize("etcd start canceled", errors.RFCCodeText("PD:server:ErrCancelStartEtcd"))
ErrConfigItem = errors.Normalize("cannot set invalid configuration", errors.RFCCodeText("PD:server:ErrConfiguration"))
ErrServerNotStarted = errors.Normalize("server not started", errors.RFCCodeText("PD:server:ErrServerNotStarted"))
ErrRateLimitExceeded = errors.Normalize("rate limit exceeded", errors.RFCCodeText("PD:server:ErrRateLimitExceeded"))
)

// logutil errors
Expand Down
199 changes: 198 additions & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package apis

import (
"net/http"
"strconv"
"sync"
"time"

"github.com/gin-contrib/cors"
"github.com/gin-contrib/gzip"
Expand All @@ -25,6 +27,7 @@ import (
"github.com/joho/godotenv"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/unrolled/render"
Expand Down Expand Up @@ -77,7 +80,7 @@ func NewService(srv *scheserver.Service) *Service {
apiHandlerEngine.Use(cors.Default())
apiHandlerEngine.Use(gzip.Gzip(gzip.DefaultCompression))
apiHandlerEngine.Use(func(c *gin.Context) {
c.Set(multiservicesapi.ServiceContextKey, srv)
c.Set(multiservicesapi.ServiceContextKey, srv.Server)
c.Next()
})
apiHandlerEngine.Use(multiservicesapi.ServiceRedirector())
Expand All @@ -90,5 +93,199 @@ func NewService(srv *scheserver.Service) *Service {
root: root,
rd: createIndentRender(),
}
s.RegisterOperatorsRouter()
s.RegisterSchedulersRouter()
s.RegisterCheckersRouter()
return s
}

// RegisterSchedulersRouter registers the router of the schedulers handler.
func (s *Service) RegisterSchedulersRouter() {
router := s.root.Group("schedulers")
router.GET("", getSchedulers)
}

// RegisterCheckersRouter registers the router of the checkers handler.
func (s *Service) RegisterCheckersRouter() {
router := s.root.Group("checkers")
router.GET("/:name", getCheckerByName)
}

// RegisterOperatorsRouter registers the router of the operators handler.
func (s *Service) RegisterOperatorsRouter() {
router := s.root.Group("operators")
router.GET("", getOperators)
router.GET("/:id", getOperatorByID)
}

// @Tags operators
// @Summary Get an operator by ID.
// @Param region_id path int true "A Region's Id"
// @Produce json
// @Success 200 {object} operator.OpWithStatus
// @Failure 400 {string} string "The input is invalid."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators/{id} [GET]
func getOperatorByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
id := c.Param("id")

regionID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

c.JSON(http.StatusOK, opController.GetOperatorStatus(regionID))
}

// @Tags operators
// @Summary List operators.
// @Param kind query string false "Specify the operator kind." Enums(admin, leader, region, waiting)
// @Produce json
// @Success 200 {array} operator.Operator
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /operators [GET]
func getOperators(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
var (
results []*operator.Operator
ops []*operator.Operator
err error
)

opController := svr.GetCoordinator().GetOperatorController()
if opController == nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
kinds := c.QueryArray("kind")
if len(kinds) == 0 {
results = opController.GetOperators()
} else {
for _, kind := range kinds {
switch kind {
case "admin":
ops = opController.GetOperatorsOfKind(operator.OpAdmin)
case "leader":
ops = opController.GetOperatorsOfKind(operator.OpLeader)
case "region":
ops = opController.GetOperatorsOfKind(operator.OpRegion)
case "waiting":
ops = opController.GetWaitingOperators()
}
results = append(results, ops...)
}
}

c.JSON(http.StatusOK, results)
}

// @Tags checkers
// @Summary Get checker by name
// @Param name path string true "The name of the checker."
// @Produce json
// @Success 200 {string} string "The checker's status."
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /checkers/{name} [get]
func getCheckerByName(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
name := c.Param("name")
co := svr.GetCoordinator()
isPaused, err := co.IsCheckerPaused(name)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
output := map[string]bool{
"paused": isPaused,
}
c.JSON(http.StatusOK, output)
}

type schedulerPausedPeriod struct {
Name string `json:"name"`
PausedAt time.Time `json:"paused_at"`
ResumeAt time.Time `json:"resume_at"`
}

// @Tags schedulers
// @Summary List all created schedulers by status.
// @Produce json
// @Success 200 {array} string
// @Failure 500 {string} string "PD server failed to proceed the request."
// @Router /schedulers [get]
func getSchedulers(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
co := svr.GetCoordinator()
sc := co.GetSchedulersController()
schedulers := sc.GetSchedulerNames()

status := c.Query("status")
_, needTS := c.GetQuery("timestamp")
switch status {
case "paused":
var pausedSchedulers []string
pausedPeriods := []schedulerPausedPeriod{}
for _, scheduler := range schedulers {
paused, err := sc.IsSchedulerPaused(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if paused {
if needTS {
s := schedulerPausedPeriod{
Name: scheduler,
PausedAt: time.Time{},
ResumeAt: time.Time{},
}
pausedAt, err := sc.GetPausedSchedulerDelayAt(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.PausedAt = time.Unix(pausedAt, 0)
resumeAt, err := sc.GetPausedSchedulerDelayUntil(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
s.ResumeAt = time.Unix(resumeAt, 0)
pausedPeriods = append(pausedPeriods, s)
} else {
pausedSchedulers = append(pausedSchedulers, scheduler)
}
}
}
if needTS {
c.JSON(http.StatusOK, pausedPeriods)
} else {
c.JSON(http.StatusOK, pausedSchedulers)
}
return
case "disabled":
var disabledSchedulers []string
for _, scheduler := range schedulers {
disabled, err := sc.IsSchedulerDisabled(scheduler)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

if disabled {
disabledSchedulers = append(disabledSchedulers, scheduler)
}
}
c.JSON(http.StatusOK, disabledSchedulers)
default:
c.JSON(http.StatusOK, schedulers)
}
}
5 changes: 3 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// TODO: implement the following methods

// UpdateRegionsLabelLevelStats updates the region label level stats.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}
// UpdateRegionsLabelLevelStats updates the status of the region label level by types.
func (c *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {
}

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) { return 0, nil }
10 changes: 10 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,11 +606,21 @@ func (o *PersistConfig) GetRegionMaxKeys() uint64 {
return o.GetStoreConfig().GetRegionMaxKeys()
}

// IsSynced returns true if the cluster config is synced.
func (o *PersistConfig) IsSynced() bool {
return o.GetStoreConfig().IsSynced()
}

// IsEnableRegionBucket return true if the region bucket is enabled.
func (o *PersistConfig) IsEnableRegionBucket() bool {
return o.GetStoreConfig().IsEnableRegionBucket()
}

// IsRaftKV2 returns the whether the cluster use `raft-kv2` engine.
func (o *PersistConfig) IsRaftKV2() bool {
return o.GetStoreConfig().IsRaftKV2()
}

// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,11 @@ func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
}

// GetCoordinator returns the coordinator.
func (s *Server) GetCoordinator() *schedule.Coordinator {
return s.coordinator
}

func (s *Server) initClient() error {
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
Expand Down Expand Up @@ -501,6 +506,7 @@ func (s *Server) startServer() (err error) {
if err != nil {
return err
}
s.service = &Service{Server: s}
tlsConfig, err := s.cfg.Security.ToTLSConfig()
if err != nil {
return err
Expand Down Expand Up @@ -543,7 +549,6 @@ func (s *Server) startServer() (err error) {
log.Error("failed to register the service", zap.String("service-name", utils.SchedulingServiceName), errs.ZapError(err))
return err
}

atomic.StoreInt64(&s.isRunning, 1)
return nil
}
Expand Down
Loading

0 comments on commit 1005871

Please sign in to comment.