Skip to content

Commit

Permalink
ttl: auto delete old rows in mysql.tidb_ttl_job_history (#40808)
Browse files Browse the repository at this point in the history
close #40807
  • Loading branch information
lcwangchao committed Jan 29, 2023
1 parent b5be9f6 commit db435e1
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 8 deletions.
2 changes: 1 addition & 1 deletion ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const ttlJobTimeout = 6 * time.Hour

const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute
const ttlTaskGCInterval = time.Hour
const ttlGCInterval = time.Hour

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
Expand Down
24 changes: 17 additions & 7 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const taskGCTemplate = `DELETE task FROM
ON task.job_id = job.current_job_id
WHERE job.table_id IS NULL`

const ttlJobHistoryGCTemplate = `DELETE FROM mysql.tidb_ttl_job_history WHERE create_time < CURDATE() - INTERVAL 90 DAY`

const timeFormat = "2006-01-02 15:04:05"

func insertNewTableIntoStatusSQL(tableID int64, parentTableID int64) (string, []interface{}) {
Expand Down Expand Up @@ -143,7 +145,7 @@ func (m *JobManager) jobLoop() error {
infoSchemaCacheUpdateTicker := time.Tick(m.infoSchemaCache.GetInterval())
tableStatusCacheUpdateTicker := time.Tick(m.tableStatusCache.GetInterval())
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
taskGC := time.Tick(jobManagerLoopTickerInterval)
gcTicker := time.Tick(ttlGCInterval)

scheduleJobTicker := time.Tick(jobManagerLoopTickerInterval)
jobCheckTicker := time.Tick(jobManagerLoopTickerInterval)
Expand Down Expand Up @@ -175,12 +177,9 @@ func (m *JobManager) jobLoop() error {
if err != nil {
logutil.Logger(m.ctx).Warn("fail to update table status cache", zap.Error(err))
}
case <-taskGC:
taskGCCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
_, err = se.ExecuteSQL(taskGCCtx, taskGCTemplate)
if err != nil {
logutil.Logger(m.ctx).Warn("fail to gc redundant scan task", zap.Error(err))
}
case <-gcTicker:
gcCtx, cancel := context.WithTimeout(m.ctx, ttlInternalSQLTimeout)
DoGC(gcCtx, se)
cancel()
// Job Schedule loop:
case <-updateJobHeartBeatTicker:
Expand Down Expand Up @@ -777,3 +776,14 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {
summary.SummaryText = string(buf)
return summary, nil
}

// DoGC deletes some old TTL job histories and redundant scan tasks
func DoGC(ctx context.Context, se session.Session) {
if _, err := se.ExecuteSQL(ctx, taskGCTemplate); err != nil {
logutil.Logger(ctx).Warn("fail to gc redundant scan task", zap.Error(err))
}

if _, err := se.ExecuteSQL(ctx, ttlJobHistoryGCTemplate); err != nil {
logutil.Logger(ctx).Warn("fail to gc ttl job history", zap.Error(err))
}
}
84 changes: 84 additions & 0 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,3 +490,87 @@ func waitAndStopTTLManager(t *testing.T, dom *domain.Domain) {
continue
}
}

func TestGCScanTasks(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
addTableStatusRecord := func(tableID, parentTableID, curJobID int64) {
tk.MustExec("INSERT INTO mysql.tidb_ttl_table_status (table_id,parent_table_id) VALUES (?, ?)", tableID, parentTableID)
if curJobID == 0 {
return
}

tk.MustExec(`UPDATE mysql.tidb_ttl_table_status
SET current_job_id = ?,
current_job_owner_id = '12345',
current_job_start_time = NOW(),
current_job_status = 'running',
current_job_status_update_time = NOW(),
current_job_ttl_expire = NOW(),
current_job_owner_hb_time = NOW()
WHERE table_id = ?`, curJobID, tableID)
}

addScanTaskRecord := func(jobID, tableID, scanID int64) {
tk.MustExec(`INSERT INTO mysql.tidb_ttl_task SET
job_id = ?,
table_id = ?,
scan_id = ?,
expire_time = NOW(),
created_time = NOW()`, jobID, tableID, scanID)
}

addTableStatusRecord(1, 1, 1)
addScanTaskRecord(1, 1, 1)
addScanTaskRecord(1, 1, 2)
addScanTaskRecord(2, 1, 1)
addScanTaskRecord(2, 1, 2)
addScanTaskRecord(3, 2, 1)
addScanTaskRecord(3, 2, 2)

se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
ttlworker.DoGC(context.TODO(), se)
tk.MustQuery("select job_id, scan_id from mysql.tidb_ttl_task order by job_id, scan_id asc").Check(testkit.Rows("1 1", "1 2"))
}

func TestGCTTLHistory(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
addHistory := func(jobID, createdBeforeDays int) {
tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb_ttl_job_history (
job_id,
table_id,
parent_table_id,
table_schema,
table_name,
partition_name,
create_time,
finish_time,
ttl_expire,
summary_text,
expired_rows,
deleted_rows,
error_delete_rows,
status
)
VALUES
(
%d, 1, 1, 'test', 't1', '',
CURDATE() - INTERVAL %d DAY,
CURDATE() - INTERVAL %d DAY + INTERVAL 1 HOUR,
CURDATE() - INTERVAL %d DAY,
"", 100, 100, 0, "finished"
)`, jobID, createdBeforeDays, createdBeforeDays, createdBeforeDays))
}

addHistory(1, 1)
addHistory(2, 30)
addHistory(3, 60)
addHistory(4, 89)
addHistory(5, 90)
addHistory(6, 91)
addHistory(7, 100)
se := session.NewSession(tk.Session(), tk.Session(), func(_ session.Session) {})
ttlworker.DoGC(context.TODO(), se)
tk.MustQuery("select job_id from mysql.tidb_ttl_job_history order by job_id asc").Check(testkit.Rows("1", "2", "3", "4", "5"))
}

0 comments on commit db435e1

Please sign in to comment.