Skip to content

Commit

Permalink
disttask: record endtime for task and subtask (#49872)
Browse files Browse the repository at this point in the history
close #49617
  • Loading branch information
ywqzzy authored Jan 4, 2024
1 parent 3389437 commit 354c186
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 107 deletions.
28 changes: 28 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ type TaskManager interface {
GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)
DeleteDeadNodes(ctx context.Context, nodes []string) error
TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
// CancelTask updated task state to canceling.
CancelTask(ctx context.Context, taskID int64) error
// FailTask updates task state to Failed and updates task error.
FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
// RevertedTask updates task state to reverted.
RevertedTask(ctx context.Context, taskID int64) error
// PauseTask updated task state to pausing.
PauseTask(ctx context.Context, taskKey string) (bool, error)
// PausedTask updated task state to paused.
PausedTask(ctx context.Context, taskID int64) error
// SucceedTask updates a task to success state.
SucceedTask(ctx context.Context, taskID int64) error
// SwitchTaskStep switches the task to the next step and add subtasks in one
// transaction. It will change task state too if we're switch from InitStep to
// next step.
Expand All @@ -51,8 +59,6 @@ type TaskManager interface {
// And each subtask of this step must be different, to handle the network
// partition or owner change.
SwitchTaskStepInBatch(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
// SucceedTask updates a task to success state.
SucceedTask(ctx context.Context, taskID int64) error
// GetUsedSlotsOnNodes returns the used slots on nodes that have subtask scheduled.
// subtasks of each task on one node is only accounted once as we don't support
// running them concurrently.
Expand Down
8 changes: 4 additions & 4 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (s *BaseScheduler) onPausing() error {
runningPendingCnt := cntByStates[proto.SubtaskStateRunning] + cntByStates[proto.SubtaskStatePending]
if runningPendingCnt == 0 {
logutil.Logger(s.logCtx).Info("all running subtasks paused, update the task to paused state")
return s.updateTask(proto.TaskStatePaused, nil, RetrySQLTimes)
return s.taskMgr.PausedTask(s.ctx, s.Task.ID)
}
logutil.Logger(s.logCtx).Debug("on pausing state, this task keeps current state", zap.Stringer("state", s.Task.State))
return nil
Expand Down Expand Up @@ -302,7 +302,7 @@ func (s *BaseScheduler) onReverting() error {
if err = s.OnDone(s.ctx, s, s.Task); err != nil {
return errors.Trace(err)
}
return s.updateTask(proto.TaskStateReverted, nil, RetrySQLTimes)
return s.taskMgr.RevertedTask(s.ctx, s.Task.ID)
}
// Wait all subtasks in this step finishes.
s.OnTick(s.ctx, s.Task)
Expand Down Expand Up @@ -641,11 +641,11 @@ func (s *BaseScheduler) handlePlanErr(err error) error {
return err
}
s.Task.Error = err

if err = s.OnDone(s.ctx, s, s.Task); err != nil {
return errors.Trace(err)
}
return s.updateTask(proto.TaskStateFailed, nil, RetrySQLTimes)

return s.taskMgr.FailTask(s.ctx, s.Task.ID, s.Task.State, s.Task.Error)
}

// MockServerInfo exported for scheduler_test.go
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "storage",
srcs = [
"task_state.go",
"task_table.go",
"util.go",
],
Expand Down
51 changes: 51 additions & 0 deletions pkg/disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,16 @@ func TestTaskTable(t *testing.T) {
require.NoError(t, err)
require.Equal(t, proto.TaskStatePending, task.State)
require.Nil(t, task.Error)
curTime := time.Unix(time.Now().Unix(), 0)
require.NoError(t, gm.FailTask(ctx, id, proto.TaskStatePending, errors.New("test error")))
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
require.Equal(t, proto.TaskStateFailed, task.State)
require.ErrorContains(t, task.Error, "test error")
endTime, err := storage.GetTaskEndTimeForTest(ctx, gm, id)
require.NoError(t, err)
require.LessOrEqual(t, endTime.Sub(curTime), time.Since(curTime))
require.GreaterOrEqual(t, endTime, curTime)

// succeed a pending task, no effect
id, err = gm.CreateTask(ctx, "key-success", "test", 4, []byte("test"))
Expand All @@ -157,6 +162,44 @@ func TestTaskTable(t *testing.T) {
require.NoError(t, err)
checkTaskStateStep(t, task, proto.TaskStateSucceed, proto.StepDone)
require.GreaterOrEqual(t, task.StateUpdateTime, startTime)

// reverted a pending task, no effect
id, err = gm.CreateTask(ctx, "key-reverted", "test", 4, []byte("test"))
require.NoError(t, err)
require.NoError(t, gm.RevertedTask(ctx, id))
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
checkTaskStateStep(t, task, proto.TaskStatePending, proto.StepInit)
// reverted a reverting task
task.State = proto.TaskStateReverting
_, err = gm.UpdateTaskAndAddSubTasks(ctx, task, nil, proto.TaskStatePending)
require.NoError(t, err)
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
require.Equal(t, proto.TaskStateReverting, task.State)
require.NoError(t, gm.RevertedTask(ctx, task.ID))
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
require.Equal(t, proto.TaskStateReverted, task.State)
// paused

id, err = gm.CreateTask(ctx, "key-paused", "test", 4, []byte("test"))
require.NoError(t, err)
require.NoError(t, gm.PausedTask(ctx, id))
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
checkTaskStateStep(t, task, proto.TaskStatePending, proto.StepInit)
// reverted a reverting task
task.State = proto.TaskStatePausing
_, err = gm.UpdateTaskAndAddSubTasks(ctx, task, nil, proto.TaskStatePending)
require.NoError(t, err)
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
require.Equal(t, proto.TaskStatePausing, task.State)
require.NoError(t, gm.PausedTask(ctx, task.ID))
task, err = gm.GetTaskByID(ctx, id)
require.NoError(t, err)
require.Equal(t, proto.TaskStatePaused, task.State)
}

func checkAfterSwitchStep(t *testing.T, startTime time.Time, task *proto.Task, subtasks []*proto.Subtask, step proto.Step) {
Expand Down Expand Up @@ -543,6 +586,10 @@ func TestSubTaskTable(t *testing.T) {
require.Greater(t, subtask.StartTime, ts)
require.Greater(t, subtask.UpdateTime, ts)

endTime, err := storage.GetSubtaskEndTimeForTest(ctx, sm, subtask.ID)
require.NoError(t, err)
require.Greater(t, endTime, ts)

// test FinishSubtask do update update time
testutil.CreateSubTask(t, sm, 4, proto.StepInit, "for_test1", []byte("test"), proto.TaskTypeExample, 11, false)
subtask, err = sm.GetFirstSubtaskInStates(ctx, "for_test1", 4, proto.StepInit, proto.SubtaskStatePending)
Expand All @@ -554,12 +601,16 @@ func TestSubTaskTable(t *testing.T) {
require.NoError(t, err)
require.Greater(t, subtask.StartTime, ts)
require.Greater(t, subtask.UpdateTime, ts)
ts = time.Now()
time.Sleep(time.Second)
require.NoError(t, sm.FinishSubtask(ctx, "for_test1", subtask.ID, []byte{}))
subtask2, err = sm.GetFirstSubtaskInStates(ctx, "for_test1", 4, proto.StepInit, proto.SubtaskStateSucceed)
require.NoError(t, err)
require.Equal(t, subtask2.StartTime, subtask.StartTime)
require.Greater(t, subtask2.UpdateTime, subtask.UpdateTime)
endTime, err = storage.GetSubtaskEndTimeForTest(ctx, sm, subtask.ID)
require.NoError(t, err)
require.Greater(t, endTime, ts)

// test UpdateFailedTaskExecutorIDs and IsTaskExecutorCanceled
canceled, err := sm.IsTaskExecutorCanceled(ctx, "for_test999", 4)
Expand Down
152 changes: 152 additions & 0 deletions pkg/disttask/framework/storage/task_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"context"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/sqlexec"
)

// CancelTask cancels task.
func (stm *TaskManager) CancelTask(ctx context.Context, taskID int64) error {
_, err := stm.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP()
where id = %? and state in (%?, %?)`,
proto.TaskStateCancelling, taskID, proto.TaskStatePending, proto.TaskStateRunning,
)
return err
}

// CancelTaskByKeySession cancels task by key using input session.
func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Context, taskKey string) error {
_, err := sqlexec.ExecSQL(ctx, se,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP()
where task_key = %? and state in (%?, %?)`,
proto.TaskStateCancelling, taskKey, proto.TaskStatePending, proto.TaskStateRunning)
return err
}

// FailTask implements the scheduler.TaskManager interface.
func (stm *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error {
_, err := stm.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
error = %?,
state_update_time = CURRENT_TIMESTAMP(),
end_time = CURRENT_TIMESTAMP()
where id = %? and state = %?`,
proto.TaskStateFailed, serializeErr(taskErr), taskID, currentState,
)
return err
}

// RevertedTask implements the scheduler.TaskManager interface.
func (stm *TaskManager) RevertedTask(ctx context.Context, taskID int64) error {
_, err := stm.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP(),
end_time = CURRENT_TIMESTAMP()
where id = %? and state = %?`,
proto.TaskStateReverted, taskID, proto.TaskStateReverting,
)
return err
}

// PauseTask pauses the task.
func (stm *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error) {
found := false
err := stm.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP()
where task_key = %? and state in (%?, %?)`,
proto.TaskStatePausing, taskKey, proto.TaskStatePending, proto.TaskStateRunning,
)
if err != nil {
return err
}
if se.GetSessionVars().StmtCtx.AffectedRows() != 0 {
found = true
}
return err
})
if err != nil {
return found, err
}
return found, nil
}

// PausedTask update the task state from pausing to paused.
func (stm *TaskManager) PausedTask(ctx context.Context, taskID int64) error {
_, err := stm.executeSQLWithNewSession(ctx,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP(),
end_time = CURRENT_TIMESTAMP()
where id = %? and state = %?`,
proto.TaskStatePaused, taskID, proto.TaskStatePausing,
)
return err
}

// ResumeTask resumes the task.
func (stm *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, error) {
found := false
err := stm.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se,
`update mysql.tidb_global_task
set state = %?,
state_update_time = CURRENT_TIMESTAMP()
where task_key = %? and state = %?`,
proto.TaskStateResuming, taskKey, proto.TaskStatePaused,
)
if err != nil {
return err
}
if se.GetSessionVars().StmtCtx.AffectedRows() != 0 {
found = true
}
return err
})
if err != nil {
return found, err
}
return found, nil
}

// SucceedTask update task state from running to succeed.
func (stm *TaskManager) SucceedTask(ctx context.Context, taskID int64) error {
return stm.WithNewSession(func(se sessionctx.Context) error {
_, err := sqlexec.ExecSQL(ctx, se, `
update mysql.tidb_global_task
set state = %?,
step = %?,
state_update_time = CURRENT_TIMESTAMP(),
end_time = CURRENT_TIMESTAMP()
where id = %? and state = %?`,
proto.TaskStateSucceed, proto.StepDone, taskID, proto.TaskStateRunning,
)
return err
})
}
Loading

0 comments on commit 354c186

Please sign in to comment.