Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: add table mysql.ttl_job_history to store ttl job histories #40655

Merged
merged 16 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func TestTableStorageStats(t *testing.T) {
"test 2",
))
rows := tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()
result := 45
result := 46
require.Len(t, rows, result)

// More tests about the privileges.
Expand Down
26 changes: 25 additions & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,27 @@ const (
created_time timestamp NOT NULL,
primary key(job_id, scan_id),
key(created_time));`

// CreateTTLJobHistory is a table that stores ttl job's history
CreateTTLJobHistory = `CREATE TABLE IF NOT EXISTS mysql.tidb_ttl_job_history (
job_id varchar(64) PRIMARY KEY,
table_id bigint(64) NOT NULL,
parent_table_id bigint(64) NOT NULL,
table_schema varchar(64) NOT NULL,
table_name varchar(64) NOT NULL,
partition_name varchar(64) DEFAULT NULL,
create_time timestamp NOT NULL,
finish_time timestamp NOT NULL,
ttl_expire timestamp NOT NULL,
summary_text text,
expired_rows bigint(64) DEFAULT NULL,
deleted_rows bigint(64) DEFAULT NULL,
error_delete_rows bigint(64) DEFAULT NULL,
status varchar(64) NOT NULL,
key(table_schema, table_name, create_time),
key(parent_table_id, create_time),
key(create_time)
);`
)

// bootstrap initiates system DB for a store.
Expand Down Expand Up @@ -757,7 +778,7 @@ const (
version109 = 109
// version110 sets tidb_enable_gc_aware_memory_track to off when a cluster upgrades from some version lower than v6.5.0.
version110 = 110
// version111 adds the table tidb_ttl_task
// version111 adds the table tidb_ttl_task and tidb_ttl_job_history
version111 = 111
)

Expand Down Expand Up @@ -2239,6 +2260,7 @@ func upgradeToVer111(s Session, ver int64) {
return
}
doReentrantDDL(s, CreateTTLTask)
doReentrantDDL(s, CreateTTLJobHistory)
}

func writeOOMAction(s Session) {
Expand Down Expand Up @@ -2349,6 +2371,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateTTLTableStatus)
// Create tidb_ttl_task table
mustExecute(s, CreateTTLTask)
// Create tidb_ttl_job_history table
mustExecute(s, CreateTTLJobHistory)
}

// doBootstrapSQLFile executes SQL commands in a file as the last stage of bootstrap.
Expand Down
10 changes: 6 additions & 4 deletions ttl/cache/ttlstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ const (
// JobStatusWaiting means the job hasn't started
JobStatusWaiting JobStatus = "waiting"
// JobStatusRunning means this job is running
JobStatusRunning = "running"
JobStatusRunning JobStatus = "running"
// JobStatusCancelling means this job is being canceled, but not canceled yet
JobStatusCancelling = "cancelling"
JobStatusCancelling JobStatus = "cancelling"
// JobStatusCancelled means this job has been canceled successfully
JobStatusCancelled = "cancelled"
JobStatusCancelled JobStatus = "cancelled"
// JobStatusTimeout means this job has timeout
JobStatusTimeout = "timeout"
JobStatusTimeout JobStatus = "timeout"
// JobStatusFinished means job has been finished
JobStatusFinished JobStatus = "finished"
)

const selectFromTTLTableStatus = "SELECT LOW_PRIORITY table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"
Expand Down
61 changes: 58 additions & 3 deletions ttl/ttlworker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ const finishJobTemplate = `UPDATE mysql.tidb_ttl_table_status
current_job_status_update_time = NULL
WHERE table_id = %? AND current_job_id = %?`
const removeTaskForJobTemplate = "DELETE FROM mysql.tidb_ttl_task WHERE job_id = %?"
const addJobHistoryTemplate = `INSERT INTO
mysql.tidb_ttl_job_history (
job_id,
table_id,
parent_table_id,
table_schema,
table_name,
partition_name,
create_time,
finish_time,
ttl_expire,
summary_text,
expired_rows,
deleted_rows,
error_delete_rows,
status
)
VALUES
(%?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?, %?)`

func updateJobCurrentStatusSQL(tableID int64, oldStatus cache.JobStatus, newStatus cache.JobStatus, jobID string) (string, []interface{}) {
return updateJobCurrentStatusTemplate, []interface{}{string(newStatus), tableID, string(oldStatus), jobID}
Expand All @@ -56,11 +75,41 @@ func removeTaskForJob(jobID string) (string, []interface{}) {
return removeTaskForJobTemplate, []interface{}{jobID}
}

func addJobHistorySQL(job *ttlJob, finishTime time.Time, summary *TTLSummary) (string, []interface{}) {
status := cache.JobStatusFinished
if job.status == cache.JobStatusTimeout || job.status == cache.JobStatusCancelled {
status = job.status
}

var partitionName interface{}
if job.tbl.Partition.O != "" {
partitionName = job.tbl.Partition.O
}

return addJobHistoryTemplate, []interface{}{
job.id,
job.tbl.ID,
job.tbl.TableInfo.ID,
job.tbl.Schema.O,
job.tbl.Name.O,
partitionName,
job.createTime.Format(timeFormat),
finishTime.Format(timeFormat),
job.ttlExpireTime.Format(timeFormat),
summary.SummaryText,
summary.TotalRows,
summary.SuccessRows,
summary.ErrorRows,
string(status),
}
}

type ttlJob struct {
id string
ownerID string

createTime time.Time
createTime time.Time
ttlExpireTime time.Time

tbl *cache.PhysicalTable

Expand All @@ -71,11 +120,11 @@ type ttlJob struct {
}

// finish turns current job into last job, and update the error message and statistics summary
func (job *ttlJob) finish(se session.Session, now time.Time, summary string) {
func (job *ttlJob) finish(se session.Session, now time.Time, summary *TTLSummary) {
// at this time, the job.ctx may have been canceled (to cancel this job)
// even when it's canceled, we'll need to update the states, so use another context
err := se.RunInTxn(context.TODO(), func() error {
sql, args := finishJobSQL(job.tbl.ID, now, summary, job.id)
sql, args := finishJobSQL(job.tbl.ID, now, summary.SummaryText, job.id)
_, err := se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
Expand All @@ -87,6 +136,12 @@ func (job *ttlJob) finish(se session.Session, now time.Time, summary string) {
return errors.Wrapf(err, "execute sql: %s", sql)
}

sql, args = addJobHistorySQL(job, now, summary)
_, err = se.ExecuteSQL(context.TODO(), sql, args...)
if err != nil {
return errors.Wrapf(err, "execute sql: %s", sql)
}

return nil
}, session.TxnModeOptimistic)

Expand Down
29 changes: 17 additions & 12 deletions ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,17 +611,18 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *
if err != nil {
return nil, err
}
return m.createNewJob(now, table)
return m.createNewJob(expireTime, now, table)
}

func (m *JobManager) createNewJob(now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) (*ttlJob, error) {
id := m.tableStatusCache.Tables[table.ID].CurrentJobID

return &ttlJob{
id: id,
ownerID: m.id,

createTime: now,
createTime: now,
ttlExpireTime: expireTime,
// at least, the info schema cache and table status cache are consistent in table id, so it's safe to get table
// information from schema cache directly
tbl: table,
Expand Down Expand Up @@ -687,7 +688,8 @@ func (m *JobManager) GetCommandCli() client.CommandClient {
return m.cmdCli
}

type ttlSummary struct {
// TTLSummary is the summary for TTL job
type TTLSummary struct {
TotalRows uint64 `json:"total_rows"`
SuccessRows uint64 `json:"success_rows"`
ErrorRows uint64 `json:"error_rows"`
Expand All @@ -697,22 +699,24 @@ type ttlSummary struct {
FinishedScanTask int `json:"finished_scan_task"`

ScanTaskErr string `json:"scan_task_err,omitempty"`
SummaryText string `json:"-"`
}

func summarizeErr(err error) (string, error) {
summary := &ttlSummary{
func summarizeErr(err error) (*TTLSummary, error) {
summary := &TTLSummary{
ScanTaskErr: err.Error(),
}

buf, err := json.Marshal(summary)
if err != nil {
return "", err
return nil, err
}
return string(buf), nil
summary.SummaryText = string(buf)
return summary, nil
}

func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) {
summary := &ttlSummary{}
func summarizeTaskResult(tasks []*cache.TTLTask) (*TTLSummary, error) {
summary := &TTLSummary{}
var allErr error
for _, t := range tasks {
if t.State != nil {
Expand All @@ -738,7 +742,8 @@ func summarizeTaskResult(tasks []*cache.TTLTask) (string, error) {

buf, err := json.Marshal(summary)
if err != nil {
return "", err
return nil, err
}
return string(buf), nil
summary.SummaryText = string(buf)
return summary, nil
}
37 changes: 30 additions & 7 deletions ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package ttlworker_test

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -69,7 +71,7 @@ func TestParallelLockNewJob(t *testing.T) {
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
require.NoError(t, err)
job.Finish(se, time.Now(), "")
job.Finish(se, time.Now(), &ttlworker.TTLSummary{})

// lock one table in parallel, only one of them should lock successfully
testTimes := 100
Expand Down Expand Up @@ -103,32 +105,53 @@ func TestParallelLockNewJob(t *testing.T) {
wg.Wait()

require.Equal(t, uint64(1), successCounter.Load())
successJob.Finish(se, time.Now(), "")
successJob.Finish(se, time.Now(), &ttlworker.TTLSummary{})
}
}

func TestFinishJob(t *testing.T) {
timeFormat := "2006-01-02 15:04:05"
store, dom := testkit.CreateMockStoreAndDomain(t)
waitAndStopTTLManager(t, dom)
tk := testkit.NewTestKit(t, store)

sessionFactory := sessionFactory(t, store)

testTable := &cache.PhysicalTable{ID: 2, TableInfo: &model.TableInfo{ID: 1, TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}
testTable := &cache.PhysicalTable{ID: 2, Schema: model.NewCIStr("db1"), TableInfo: &model.TableInfo{ID: 1, Name: model.NewCIStr("t1"), TTLInfo: &model.TTLInfo{IntervalExprStr: "1", IntervalTimeUnit: int(ast.TimeUnitDay)}}}

tk.MustExec("insert into mysql.tidb_ttl_table_status(table_id) values (2)")

// finish with error
m := ttlworker.NewJobManager("test-id", nil, store, nil)
m.InfoSchemaCache().Tables[testTable.ID] = testTable
se := sessionFactory()
job, err := m.LockNewJob(context.Background(), se, testTable, time.Now(), false)
startTime := time.Now()
job, err := m.LockNewJob(context.Background(), se, testTable, startTime, false)
require.NoError(t, err)

expireTime, err := testTable.EvalExpireTime(context.Background(), se, startTime)
require.NoError(t, err)

summary := `{"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}`
job.Finish(se, time.Now(), summary)
tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows(`2 {"total_rows":0,"scan_task_err":"\"'an error message contains both single and double quote'\""}`))
summary := &ttlworker.TTLSummary{
ScanTaskErr: "\"'an error message contains both single and double quote'\"",
TotalRows: 128,
SuccessRows: 120,
ErrorRows: 8,
}
summaryBytes, err := json.Marshal(summary)
summary.SummaryText = string(summaryBytes)

require.NoError(t, err)
endTime := time.Now()
job.Finish(se, endTime, summary)
tk.MustQuery("select table_id, last_job_summary from mysql.tidb_ttl_table_status").Check(testkit.Rows("2 " + summary.SummaryText))
tk.MustQuery("select * from mysql.tidb_ttl_task").Check(testkit.Rows())
expectedRow := []string{
job.ID(), "2", "1", "db1", "t1", "<nil>",
startTime.Format(timeFormat), endTime.Format(timeFormat), expireTime.Format(timeFormat),
summary.SummaryText, "128", "120", "8", "finished",
}
tk.MustQuery("select * from mysql.tidb_ttl_job_history").Check(testkit.Rows(strings.Join(expectedRow, " ")))
}

func TestTTLAutoAnalyze(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (m *JobManager) UpdateHeartBeat(ctx context.Context, se session.Session, no
return m.updateHeartBeat(ctx, se, now)
}

func (j *ttlJob) Finish(se session.Session, now time.Time, summary string) {
func (j *ttlJob) Finish(se session.Session, now time.Time, summary *TTLSummary) {
j.finish(se, now, summary)
}

Expand Down