Skip to content

Commit

Permalink
dxf: fix the task num count incorrectly (#57127)
Browse files Browse the repository at this point in the history
close #57172
  • Loading branch information
fzzf678 authored Nov 7, 2024
1 parent 120b36b commit 559f218
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/backoff"
disttaskutil "github.com/pingcap/tidb/pkg/util/disttask"
Expand Down Expand Up @@ -170,6 +169,8 @@ func (s *BaseScheduler) scheduleTask() {
if errors.Cause(err) == storage.ErrTaskNotFound {
// this can happen when task is reverted/succeed, but before
// we reach here, cleanup routine move it to history.
s.logger.Debug("task not found, might be reverted/succeed/failed", zap.Int64("task_id", s.GetTask().ID),
zap.String("task_key", s.GetTask().Key))
return
}
s.logger.Error("refresh task failed", zap.Error(err))
Expand Down Expand Up @@ -399,7 +400,6 @@ func (s *BaseScheduler) onRunning() error {

func (s *BaseScheduler) onFinished() {
task := s.GetTask()
metrics.UpdateMetricsForFinishTask(task)
s.logger.Debug("schedule task, task is finished", zap.Stringer("state", task.State))
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,7 @@ func (sm *Manager) startSchedulers(schedulableTasks []*proto.TaskBase) error {
zap.Int64("task-id", task.ID), zap.Stringer("state", task.State))
}

metrics.DistTaskGauge.WithLabelValues(task.Type.String(), metrics.SchedulingStatus).Inc()
metrics.UpdateMetricsForScheduleTask(task.ID, task.Type)
metrics.UpdateMetricsForScheduleTask(task)
sm.startScheduler(task, allocateSlots, reservedExecID)
}
return nil
Expand Down Expand Up @@ -437,6 +436,7 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
// if task doesn't register cleanup function, mark it as cleaned.
cleanedTasks = append(cleanedTasks, task)
}
metrics.UpdateMetricsForFinishTask(task)
}
if firstErr != nil {
sm.logger.Warn("cleanup routine failed", zap.Error(errors.Trace(firstErr)))
Expand Down
9 changes: 5 additions & 4 deletions pkg/metrics/disttask.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func UpdateMetricsForAddTask(task *proto.TaskBase) {
}

// UpdateMetricsForScheduleTask update metrics when a task is added
func UpdateMetricsForScheduleTask(id int64, taskType proto.TaskType) {
DistTaskGauge.WithLabelValues(taskType.String(), WaitingStatus).Dec()
DistTaskStartTimeGauge.DeleteLabelValues(taskType.String(), WaitingStatus, fmt.Sprint(id))
DistTaskStartTimeGauge.WithLabelValues(taskType.String(), SchedulingStatus, fmt.Sprint(id)).SetToCurrentTime()
func UpdateMetricsForScheduleTask(task *proto.TaskBase) {
DistTaskGauge.WithLabelValues(task.Type.String(), WaitingStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type.String(), SchedulingStatus).Inc()
DistTaskStartTimeGauge.DeleteLabelValues(task.Type.String(), WaitingStatus, fmt.Sprint(task.ID))
DistTaskStartTimeGauge.WithLabelValues(task.Type.String(), SchedulingStatus, fmt.Sprint(task.ID)).SetToCurrentTime()
}

// UpdateMetricsForRunTask update metrics when a task starts running
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -16487,7 +16487,7 @@
"targets": [
{
"exemplar": true,
"expr": "sum(tidb_disttask_subtasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending|running|reverting|revert_pending|paused\"}) by (exec_id)",
"expr": "sum(tidb_disttask_subtasks{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", status=~\"pending|running|failed|canceled|paused\"}) by (exec_id)",
"interval": "",
"legendFormat": "",
"queryType": "randomWalk",
Expand Down

0 comments on commit 559f218

Please sign in to comment.