Skip to content

Commit

Permalink
give the cancel reason
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <[email protected]>
  • Loading branch information
bufferflies committed Jun 26, 2023
1 parent 4d26c8b commit 5719061
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 35 deletions.
4 changes: 2 additions & 2 deletions pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {
suite.NotNil(ops)
suite.True(oc.AddOperator(ops...))
for _, op := range ops {
oc.RemoveOperator(op)
oc.RemoveOperator(op, operator.ExceedStoreLimit)
}
}
regions[2] = regions[2].Clone(
Expand All @@ -498,7 +498,7 @@ func (suite *mergeCheckerTestSuite) TestStoreLimitWithMerge() {
suite.NotNil(ops)
suite.True(oc.AddOperator(ops...))
for _, op := range ops {
oc.RemoveOperator(op)
oc.RemoveOperator(op, operator.ExceedStoreLimit)
}
}
{
Expand Down
25 changes: 24 additions & 1 deletion pkg/schedule/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,26 @@ const (
// OperatorExpireTime is the duration that when an operator is not started
// after it, the operator will be considered expired.
OperatorExpireTime = 3 * time.Second
CancelReason = "cancel-reason"
)

type CancelReasonType string

var (
RegionNotFound CancelReasonType = "region not found"
EpochNotMatch CancelReasonType = "epoch not match"
AlreadyExist CancelReasonType = "already exist"
AdminStop CancelReasonType = "admin stop"
NotInRunningState CancelReasonType = "not in running state"
Succeed CancelReasonType = "succeed"
Timeout CancelReasonType = "timeout"
Expired CancelReasonType = "expired"
NotInCreateStatus CancelReasonType = "not in create status"
StaleStatus CancelReasonType = "stale status"

ExceedStoreLimit CancelReasonType = "exceed store limit"
ExceedWaitLimit CancelReasonType = "exceed wait limit"
Unknown CancelReasonType = "unknown"
)

// Operator contains execution steps generated by scheduler.
Expand Down Expand Up @@ -227,7 +247,10 @@ func (o *Operator) CheckSuccess() bool {
}

// Cancel marks the operator canceled.
func (o *Operator) Cancel() bool {
func (o *Operator) Cancel(reason CancelReasonType) bool {
if _, ok := o.AdditionalInfos[CancelReason]; !ok {
o.AdditionalInfos[CancelReason] = string(reason)
}
return o.status.To(CANCELED)
}

Expand Down
80 changes: 54 additions & 26 deletions pkg/schedule/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
if op.ContainNonWitnessStep() {
recordOpStepWithTTL(op.RegionID())
}
if oc.RemoveOperator(op) {
if oc.RemoveOperator(op, Succeed) {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-success").Inc()
oc.PromoteWaitingOperator()
}
Expand All @@ -134,7 +134,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
oc.pushFastOperator(op)
}
case TIMEOUT:
if oc.RemoveOperator(op) {
if oc.RemoveOperator(op, Timeout) {
operatorCounter.WithLabelValues(op.Desc(), "promote-timeout").Inc()
oc.PromoteWaitingOperator()
}
Expand All @@ -150,7 +150,7 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
failpoint.Inject("unexpectedOperator", func() {
panic(op)
})
_ = op.Cancel()
_ = op.Cancel(NotInRunningState)
oc.buryOperator(op)
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-unexpected").Inc()
oc.PromoteWaitingOperator()
Expand All @@ -162,7 +162,8 @@ func (oc *Controller) Dispatch(region *core.RegionInfo, source string, recordOpS
func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core.RegionInfo) bool {
err := step.CheckInProgress(oc.cluster, oc.config, region)
if err != nil {
if oc.RemoveOperator(op, zap.String("reason", err.Error())) {
log.Info("operator is stale", zap.Uint64("region-id", op.RegionID()), errs.ZapError(err))
if oc.RemoveOperator(op, StaleStatus) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc()
oc.PromoteWaitingOperator()
Expand All @@ -177,11 +178,13 @@ func (oc *Controller) checkStaleOperator(op *Operator, step OpStep, region *core
latest := region.GetRegionEpoch()
changes := latest.GetConfVer() - origin.GetConfVer()
if changes > op.ConfVerChanged(region) {
log.Info("operator is stale",
zap.Uint64("region-id", op.RegionID()),
zap.Uint64("diff", changes),
zap.Reflect("latest-epoch", region.GetRegionEpoch()))
if oc.RemoveOperator(
op,
zap.String("reason", "stale operator, confver does not meet expectations"),
zap.Reflect("latest-epoch", region.GetRegionEpoch()),
zap.Uint64("diff", changes),
EpochNotMatch,
) {
operatorCounter.WithLabelValues(op.Desc(), "stale").Inc()
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-stale").Inc()
Expand Down Expand Up @@ -220,7 +223,7 @@ func (oc *Controller) pollNeedDispatchRegion() (r *core.RegionInfo, next bool) {
r = oc.cluster.GetRegion(regionID)
if r == nil {
_ = oc.removeOperatorLocked(op)
if op.Cancel() {
if op.Cancel(RegionNotFound) {
log.Warn("remove operator because region disappeared",
zap.Uint64("region-id", op.RegionID()),
zap.Stringer("operator", op))
Expand Down Expand Up @@ -285,14 +288,14 @@ func (oc *Controller) AddWaitingOperator(ops ...*Operator) int {
}
isMerge = true
}
if !oc.checkAddOperator(false, op) {
_ = op.Cancel()
if pass, reason := oc.checkAddOperator(false, op); !pass {
_ = op.Cancel(reason)
oc.buryOperator(op)
if isMerge {
// Merge operation have two operators, cancel them all
i++
next := ops[i]
_ = next.Cancel()
_ = next.Cancel(reason)
oc.buryOperator(next)
}
continue
Expand Down Expand Up @@ -327,9 +330,17 @@ func (oc *Controller) AddOperator(ops ...*Operator) bool {
// note: checkAddOperator uses false param for `isPromoting`.
// This is used to keep check logic before fixing issue #4946,
// but maybe user want to add operator when waiting queue is busy
if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(false, ops...) {

if oc.exceedStoreLimitLocked(ops...) {
for _, op := range ops {
_ = op.Cancel(ExceedStoreLimit)
oc.buryOperator(op)
}
return false
}
if pass, reason := oc.checkAddOperator(false, ops...); !pass {
for _, op := range ops {
_ = op.Cancel()
_ = op.Cancel(reason)
oc.buryOperator(op)
}
return false
Expand All @@ -354,11 +365,20 @@ func (oc *Controller) PromoteWaitingOperator() {
return
}
operatorWaitCounter.WithLabelValues(ops[0].Desc(), "get").Inc()
if oc.exceedStoreLimitLocked(ops...) {
for _, op := range ops {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc()
_ = op.Cancel(ExceedStoreLimit)
oc.buryOperator(op)
}
oc.wopStatus.ops[ops[0].Desc()]--
continue
}

if oc.exceedStoreLimitLocked(ops...) || !oc.checkAddOperator(true, ops...) {
if pass, reason := oc.checkAddOperator(true, ops...); !pass {
for _, op := range ops {
operatorWaitCounter.WithLabelValues(op.Desc(), "promote-canceled").Inc()
_ = op.Cancel()
_ = op.Cancel(reason)
oc.buryOperator(op)
}
oc.wopStatus.ops[ops[0].Desc()]--
Expand All @@ -382,14 +402,14 @@ func (oc *Controller) PromoteWaitingOperator() {
// - The region already has a higher priority or same priority
// - Exceed the max number of waiting operators
// - At least one operator is expired.
func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool {
func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) (bool, CancelReasonType) {
for _, op := range ops {
region := oc.cluster.GetRegion(op.RegionID())
if region == nil {
log.Debug("region not found, cancel add operator",
zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "not-found").Inc()
return false
return false, RegionNotFound
}
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() ||
region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
Expand All @@ -398,14 +418,14 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool
zap.Reflect("old", region.GetRegionEpoch()),
zap.Reflect("new", op.RegionEpoch()))
operatorWaitCounter.WithLabelValues(op.Desc(), "epoch-not-match").Inc()
return false
return false, EpochNotMatch
}
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
log.Debug("already have operator, cancel add operator",
zap.Uint64("region-id", op.RegionID()),
zap.Reflect("old", old))
operatorWaitCounter.WithLabelValues(op.Desc(), "already-have").Inc()
return false
return false, AlreadyExist
}
if op.Status() != CREATED {
log.Error("trying to add operator with unexpected status",
Expand All @@ -416,26 +436,28 @@ func (oc *Controller) checkAddOperator(isPromoting bool, ops ...*Operator) bool
panic(op)
})
operatorWaitCounter.WithLabelValues(op.Desc(), "unexpected-status").Inc()
return false
return false, NotInCreateStatus
}
if !isPromoting && oc.wopStatus.ops[op.Desc()] >= oc.config.GetSchedulerMaxWaitingOperator() {
log.Debug("exceed max return false", zap.Uint64("waiting", oc.wopStatus.ops[op.Desc()]), zap.String("desc", op.Desc()), zap.Uint64("max", oc.config.GetSchedulerMaxWaitingOperator()))
operatorWaitCounter.WithLabelValues(op.Desc(), "exceed-max").Inc()
return false
return false, ExceedWaitLimit
}

if op.SchedulerKind() == OpAdmin || op.IsLeaveJointStateOperator() {
continue
}
}
expired := false
var reason CancelReasonType
for _, op := range ops {
if op.CheckExpired() {
expired = true
reason = Expired
operatorWaitCounter.WithLabelValues(op.Desc(), "expired").Inc()
}
}
return !expired
return !expired, reason
}

func isHigherPriorityOperator(new, old *Operator) bool {
Expand Down Expand Up @@ -521,18 +543,24 @@ func (oc *Controller) ack(op *Operator) {
}

// RemoveOperator removes an operator from the running operators.
func (oc *Controller) RemoveOperator(op *Operator, extraFields ...zap.Field) bool {
func (oc *Controller) RemoveOperator(op *Operator, reasons ...CancelReasonType) bool {
oc.Lock()
removed := oc.removeOperatorLocked(op)
oc.Unlock()
var cancelReason CancelReasonType
if len(reasons) > 0 {
cancelReason = reasons[0]
} else {
cancelReason = Unknown
}
if removed {
if op.Cancel() {
if op.Cancel(cancelReason) {
log.Info("operator removed",
zap.Uint64("region-id", op.RegionID()),
zap.Duration("takes", op.RunningTime()),
zap.Reflect("operator", op))
}
oc.buryOperator(op, extraFields...)
oc.buryOperator(op)
}
return removed
}
Expand Down Expand Up @@ -567,7 +595,7 @@ func (oc *Controller) buryOperator(op *Operator, extraFields ...zap.Field) {
panic(op)
})
operatorCounter.WithLabelValues(op.Desc(), "unexpected").Inc()
_ = op.Cancel()
_ = op.Cancel(Unknown)
}

switch st {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/operator/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (suite *operatorControllerTestSuite) TestCheckAddUnexpectedStatus() {
// finished op canceled
op := NewTestOperator(1, &metapb.RegionEpoch{}, OpRegion, TransferLeader{ToStore: 2})
suite.True(oc.checkAddOperator(false, op))
suite.True(op.Cancel())
suite.True(op.Cancel(AdminStop))
suite.False(oc.checkAddOperator(false, op))
}
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/scatter/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (r *RegionScatterer) scatterRegions(regions map[uint64]*core.RegionInfo, fa
}
failpoint.Inject("scatterHbStreamsDrain", func() {
r.opController.GetHBStreams().Drain(1)
r.opController.RemoveOperator(op)
r.opController.RemoveOperator(op, operator.AdminStop)
})
}
delete(failures, region.GetID())
Expand Down
3 changes: 0 additions & 3 deletions pkg/schedule/schedulers/split_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/core"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/plan"
Expand Down Expand Up @@ -251,8 +250,6 @@ func (s *splitBucketScheduler) splitBucket(plan *splitBucketPlan) []*operator.Op
return nil
}
splitBucketNewOperatorCounter.Inc()
op.AdditionalInfos["region-start-key"] = core.HexRegionKeyStr(region.GetStartKey())
op.AdditionalInfos["region-end-key"] = core.HexRegionKeyStr(region.GetEndKey())
op.AdditionalInfos["hot-degree"] = strconv.FormatInt(int64(splitBucket.HotDegree), 10)
return []*operator.Operator{op}
}
Expand Down
2 changes: 1 addition & 1 deletion server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (h *Handler) RemoveOperator(regionID uint64) error {
return ErrOperatorNotFound
}

_ = c.RemoveOperator(op)
_ = c.RemoveOperator(op, operator.AdminStop)
return nil
}

Expand Down

0 comments on commit 5719061

Please sign in to comment.