Skip to content

Commit

Permalink
ttl: execute scan tasks (#40564)
Browse files Browse the repository at this point in the history
close #40364
  • Loading branch information
YangKeao authored Jan 18, 2023
1 parent dc30a5b commit f268e29
Show file tree
Hide file tree
Showing 20 changed files with 1,244 additions and 838 deletions.
22 changes: 16 additions & 6 deletions ttl/cache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,21 @@ const insertIntoTTLTask = `INSERT LOW_PRIORITY INTO mysql.tidb_ttl_task SET
expire_time = %?,
created_time = %?`

// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
func SelectFromTTLTaskWithID(jobID string) (string, []interface{}) {
// SelectFromTTLTaskWithJobID returns an SQL statement to get all tasks of the specified job in mysql.tidb_ttl_task
func SelectFromTTLTaskWithJobID(jobID string) (string, []interface{}) {
return selectFromTTLTask + " WHERE job_id = %?", []interface{}{jobID}
}

// SelectFromTTLTaskWithID returns an SQL statement to get all tasks of the specified job and scanID in mysql.tidb_ttl_task
func SelectFromTTLTaskWithID(jobID string, scanID int64) (string, []interface{}) {
return selectFromTTLTask + " WHERE job_id = %? AND scan_id = %?", []interface{}{jobID, scanID}
}

// PeekWaitingTTLTask returns an SQL statement to get `limit` waiting ttl task
func PeekWaitingTTLTask(limit int, hbExpire time.Time) (string, []interface{}) {
return selectFromTTLTask + " WHERE status = 'waiting' OR owner_hb_time < %? ORDER BY created_time ASC LIMIT %?", []interface{}{hbExpire.Format("2006-01-02 15:04:05"), limit}
}

// InsertIntoTTLTask returns an SQL statement to insert a ttl task into mysql.tidb_ttl_task
func InsertIntoTTLTask(sctx sessionctx.Context, jobID string, tableID int64, scanID int, scanRangeStart []types.Datum, scanRangeEnd []types.Datum, expireTime time.Time, createdTime time.Time) (string, []interface{}, error) {
rangeStart, err := codec.EncodeKey(sctx.GetSessionVars().StmtCtx, []byte{}, scanRangeStart...)
Expand Down Expand Up @@ -115,8 +125,8 @@ func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) {
}
if !row.IsNull(3) {
scanRangeStartBuf := row.GetBytes(3)
// it's still posibble to be nil even this column is not NULL
if scanRangeStartBuf != nil {
// it's still posibble to be empty even this column is not NULL
if len(scanRangeStartBuf) > 0 {
task.ScanRangeStart, err = codec.Decode(scanRangeStartBuf, len(scanRangeStartBuf))
if err != nil {
return nil, err
Expand All @@ -125,8 +135,8 @@ func RowToTTLTask(sctx sessionctx.Context, row chunk.Row) (*TTLTask, error) {
}
if !row.IsNull(4) {
scanRangeEndBuf := row.GetBytes(4)
// it's still posibble to be nil even this column is not NULL
if scanRangeEndBuf != nil {
// it's still posibble to be empty even this column is not NULL
if len(scanRangeEndBuf) > 0 {
task.ScanRangeEnd, err = codec.Decode(scanRangeEndBuf, len(scanRangeEndBuf))
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func newTaskGetter(ctx context.Context, t *testing.T, tk *testkit.TestKit) *task
}

func (tg *taskGetter) mustGetTestTask() *cache.TTLTask {
sql, args := cache.SelectFromTTLTaskWithID("test-job")
sql, args := cache.SelectFromTTLTaskWithJobID("test-job")
rs, err := tg.tk.Session().ExecuteInternal(tg.ctx, sql, args...)
require.NoError(tg.t, err)
rows, err := session.GetRows4Test(context.Background(), tg.tk.Session(), rs)
Expand Down
25 changes: 22 additions & 3 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ import (
"github.com/pingcap/tidb/util/sqlexec"
)

// TxnMode represents using optimistic or pessimistic mode in the transaction
type TxnMode int

const (
// TxnModeOptimistic means using the optimistic transaction with "BEGIN OPTIMISTIC"
TxnModeOptimistic TxnMode = iota
// TxnModePessimistic means using the pessimistic transaction with "BEGIN PESSIMISTIC"
TxnModePessimistic
)

// Session is used to execute queries for TTL case
type Session interface {
sessionctx.Context
Expand All @@ -38,7 +48,7 @@ type Session interface {
// ExecuteSQL executes the sql
ExecuteSQL(ctx context.Context, sql string, args ...interface{}) ([]chunk.Row, error)
// RunInTxn executes the specified function in a txn
RunInTxn(ctx context.Context, fn func() error) (err error)
RunInTxn(ctx context.Context, fn func() error, mode TxnMode) (err error)
// ResetWithGlobalTimeZone resets the session time zone to global time zone
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Expand Down Expand Up @@ -94,12 +104,21 @@ func (s *session) ExecuteSQL(ctx context.Context, sql string, args ...interface{
}

// RunInTxn executes the specified function in a txn
func (s *session) RunInTxn(ctx context.Context, fn func() error) (err error) {
func (s *session) RunInTxn(ctx context.Context, fn func() error, txnMode TxnMode) (err error) {
tracer := metrics.PhaseTracerFromCtx(ctx)
defer tracer.EnterPhase(tracer.Phase())

tracer.EnterPhase(metrics.PhaseBeginTxn)
if _, err = s.ExecuteSQL(ctx, "BEGIN OPTIMISTIC"); err != nil {
sql := "BEGIN "
switch txnMode {
case TxnModeOptimistic:
sql += "OPTIMISTIC"
case TxnModePessimistic:
sql += "PESSIMISTIC"
default:
return errors.New("unknown transaction mode")
}
if _, err = s.ExecuteSQL(ctx, sql); err != nil {
return err
}
tracer.EnterPhase(metrics.PhaseOther)
Expand Down
6 changes: 3 additions & 3 deletions ttl/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,20 @@ func TestSessionRunInTxn(t *testing.T) {
require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (1, 10)")
return nil
}))
}, session.TxnModeOptimistic))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10"))

err := se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (2, 20)")
return errors.New("mockErr")
})
}, session.TxnModeOptimistic)
require.EqualError(t, err, "mockErr")
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10"))

require.NoError(t, se.RunInTxn(context.TODO(), func() error {
tk.MustExec("insert into t values (3, 30)")
return nil
}))
}, session.TxnModeOptimistic))
tk2.MustQuery("select * from t order by id asc").Check(testkit.Rows("1 10", "3 30"))
}

Expand Down
7 changes: 4 additions & 3 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"job_manager.go",
"scan.go",
"session.go",
"task_manager.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/ttl/ttlworker",
Expand All @@ -27,7 +28,6 @@ go_library(
"//types",
"//util",
"//util/chunk",
"//util/hack",
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
Expand All @@ -46,16 +46,17 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_integration_test.go",
"job_manager_integration_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
"session_test.go",
"task_manager_integration_test.go",
"task_manager_test.go",
],
embed = [":ttlworker"],
flaky = True,
deps = [
"//domain",
"//infoschema",
"//kv",
"//parser/ast",
Expand Down
10 changes: 10 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
const ttlJobTimeout = 6 * time.Hour

const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
Expand All @@ -50,3 +53,10 @@ func getResizeWorkersInterval() time.Duration {
})
return resizeWorkersInterval
}

func getTaskManagerLoopTickerInterval() time.Duration {
failpoint.Inject("task-manager-loop-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return taskManagerLoopTickerInterval
}
157 changes: 5 additions & 152 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package ttlworker

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -44,7 +42,6 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
current_job_status = NULL,
current_job_status_update_time = NULL
WHERE table_id = %? AND current_job_id = %?`
const updateJobStateTemplate = "UPDATE mysql.tidb_ttl_table_status SET current_job_state = %? WHERE table_id = %? AND current_job_id = %? AND current_job_owner_id = %?"
const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?"

func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) {
Expand All @@ -55,10 +52,6 @@ func finishJobSQL(tableID int64, finishTime time.Time, summary string, jobID str
return finishJobTemplate, []interface{}{finishTime.Format(timeFormat), summary, tableID, jobID}
}

func updateJobState(tableID int64, currentJobID string, currentJobState string, currentJobOwnerID string) (string, []interface{}) {
return updateJobStateTemplate, []interface{}{currentJobState, tableID, currentJobID, currentJobOwnerID}
}

func removeTaskForJob(jobID string) (string, []interface{}) {
return removeTaskForJobTemplate, []interface{}{jobID}
}
Expand All @@ -67,78 +60,23 @@ type ttlJob struct {
id string
ownerID string

ctx context.Context
cancel func()

createTime time.Time

tbl *cache.PhysicalTable

tasks []*ttlScanTask
taskIter int
finishedScanTaskCounter int
scanTaskErr error

// status is the only field which should be protected by a mutex, as `Cancel` may be called at any time, and will
// change the status
statusMutex sync.Mutex
status cache.JobStatus

statistics *ttlStatistics
}

// changeStatus updates the state of this job
func (job *ttlJob) changeStatus(ctx context.Context, se session.Session, status cache.JobStatus) error {
job.statusMutex.Lock()
oldStatus := job.status
job.status = status
job.statusMutex.Unlock()

sql, args := updateJobCurrentStatusSQL(job.tbl.ID, oldStatus, status, job.id)
_, err := se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}

func (job *ttlJob) updateState(ctx context.Context, se session.Session) error {
summary, err := job.summary()
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}
sql, args := updateJobState(job.tbl.ID, job.id, summary, job.ownerID)
_, err = se.ExecuteSQL(ctx, sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}

// peekScanTask returns the next scan task, but doesn't promote the iterator
func (job *ttlJob) peekScanTask() *ttlScanTask {
return job.tasks[job.taskIter]
}

// nextScanTask promotes the iterator
func (job *ttlJob) nextScanTask() {
job.taskIter += 1
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time) {
summary, err := job.summary()
if err != nil {
logutil.Logger(job.ctx).Warn("fail to generate summary for ttl job", zap.Error(err))
}

func (job *ttlJob) finish(se session.Session, now time.Time, summary string) {
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err = se.RunInTxn(context.TODO(), func() error {
err := se.RunInTxn(context.TODO(), func() error {
sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id)
_, err = se.ExecuteSQL(context.TODO(), sql, args...)
_, err := se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}
Expand All @@ -150,94 +88,9 @@ func (job *ttlJob) finish(se session.Session, now time.Time) {
}

return nil
})
}, session.TxnModeOptimistic)

if err != nil {
logutil.Logger(job.ctx).Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
logutil.BgLogger().Error("fail to finish a ttl job", zap.Error(err), zap.Int64("tableID", job.tbl.ID), zap.String("jobID", job.id))
}
}

// AllSpawned returns whether all scan tasks have been dumped out
// **This function will be called concurrently, in many workers' goroutine**
func (job *ttlJob) AllSpawned() bool {
return job.taskIter == len(job.tasks) && len(job.tasks) != 0
}

// Timeout will return whether the job has timeout, if it is, it will be killed
func (job *ttlJob) Timeout(ctx context.Context, se session.Session, now time.Time) bool {
if !job.createTime.Add(ttlJobTimeout).Before(now) {
return false
}

err := job.changeStatus(ctx, se, cache.JobStatusTimeout)
if err != nil {
logutil.BgLogger().Info("fail to update status of ttl job", zap.String("jobID", job.id), zap.Error(err))
}

return true
}

// Finished returns whether the job is finished
func (job *ttlJob) Finished() bool {
job.statusMutex.Lock()
defer job.statusMutex.Unlock()
// in three condition, a job is considered finished:
// 1. It's cancelled manually
// 2. All scan tasks have been finished, and all selected rows succeed or in error state
// 3. The job is created one hour ago. It's a timeout.
return job.status == cache.JobStatusCancelled || (job.AllSpawned() && job.finishedScanTaskCounter == len(job.tasks) && job.statistics.TotalRows.Load() == job.statistics.ErrorRows.Load()+job.statistics.SuccessRows.Load())
}

// Cancel cancels the job context
func (job *ttlJob) Cancel(ctx context.Context, se session.Session) error {
if job.cancel != nil {
job.cancel()
}
// TODO: wait until all tasks have been finished
return job.changeStatus(ctx, se, cache.JobStatusCancelled)
}

func findJobWithTableID(jobs []*ttlJob, id int64) *ttlJob {
for _, j := range jobs {
if j.tbl.ID == id {
return j
}
}

return nil
}

type ttlSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`

TotalScanTask int `json:"total_scan_task"`
ScheduledScanTask int `json:"scheduled_scan_task"`
FinishedScanTask int `json:"finished_scan_task"`

ScanTaskErr string `json:"scan_task_err,omitempty"`
}

func (job *ttlJob) summary() (string, error) {
summary := &ttlSummary{
TotalRows: job.statistics.TotalRows.Load(),
SuccessRows: job.statistics.SuccessRows.Load(),
ErrorRows: job.statistics.ErrorRows.Load(),

TotalScanTask: len(job.tasks),
ScheduledScanTask: job.taskIter,
FinishedScanTask: job.finishedScanTaskCounter,
}

if job.scanTaskErr != nil {
summary.ScanTaskErr = job.scanTaskErr.Error()
}

summaryJSON, err := json.Marshal(summary)
if err != nil {
return "", err
}

return string(hack.String(summaryJSON)), nil
}
Loading

0 comments on commit f268e29

Please sign in to comment.