Skip to content

Commit

Permalink
ddl: move insert job to table to job submitter (#56542)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Oct 11, 2024
1 parent a56674c commit 1946f92
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 99 deletions.
100 changes: 1 addition & 99 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
package ddl

import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"runtime"
"slices"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -658,102 +656,6 @@ func (d *ddl) getTableByTxn(r autoid.Requirement, schemaID, tableID int64) (*mod
return dbInfo, tbl, err
}

const (
addDDLJobSQL = "insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values"
updateDDLJobSQL = "update mysql.tidb_ddl_job set job_meta = %s where job_id = %d"
)

func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error {
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
if len(jobWs) == 0 {
return nil
}
var sql bytes.Buffer
sql.WriteString(addDDLJobSQL)
for i, jobW := range jobWs {
// TODO remove this check when all job type pass args in this way.
if jobW.JobArgs != nil {
jobW.FillArgs(jobW.JobArgs)
}
injectModifyJobArgFailPoint(jobWs)
b, err := jobW.Encode(true)
if err != nil {
return err
}
if i != 0 {
sql.WriteString(",")
}
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(),
strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)),
util.WrapKey2String(b), jobW.Type, jobW.Started())
}
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(ctx, sql.String(), "insert_job")
logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String()))
return errors.Trace(err)
}

func makeStringForIDs(ids []int64) string {
set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}

s := make([]string, 0, len(set))
for id := range set {
s = append(s, strconv.FormatInt(id, 10))
}
slices.Sort(s)
return strings.Join(s, ",")
}

func job2SchemaIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos)*2)
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.OldSchemaID, info.NewSchemaID)
}
return makeStringForIDs(ids)
case model.ActionRenameTable:
oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID
ids := []int64{oldSchemaID, jobW.SchemaID}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID})
default:
return strconv.FormatInt(jobW.SchemaID, 10)
}
}

func job2TableIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos))
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.TableID)
}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.TableID, args.PTTableID})
case model.ActionTruncateTable:
newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID
return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
default:
return strconv.FormatInt(jobW.TableID, 10)
}
}

func updateDDLJob2Table(
ctx context.Context,
se *sess.Session,
Expand All @@ -764,7 +666,7 @@ func updateDDLJob2Table(
if err != nil {
return err
}
sql := fmt.Sprintf(updateDDLJobSQL, util.WrapKey2String(b), job.ID)
sql := fmt.Sprintf("update mysql.tidb_ddl_job set job_meta = %s where job_id = %d", util.WrapKey2String(b), job.ID)
_, err = se.Execute(ctx, sql, "update_job")
return errors.Trace(err)
}
Expand Down
94 changes: 94 additions & 0 deletions pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package ddl

import (
"bytes"
"context"
"fmt"
"math"
"slices"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -677,6 +680,97 @@ func lockGlobalIDKey(ctx context.Context, ddlSe *sess.Session, txn kv.Transactio
return forUpdateTs, err
}

func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWrapper) error {
failpoint.Inject("mockAddBatchDDLJobsErr", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(errors.Errorf("mockAddBatchDDLJobsErr"))
}
})
if len(jobWs) == 0 {
return nil
}
var sql bytes.Buffer
sql.WriteString("insert into mysql.tidb_ddl_job(job_id, reorg, schema_ids, table_ids, job_meta, type, processing) values")
for i, jobW := range jobWs {
// TODO remove this check when all job type pass args in this way.
if jobW.JobArgs != nil {
jobW.FillArgs(jobW.JobArgs)
}
injectModifyJobArgFailPoint(jobWs)
b, err := jobW.Encode(true)
if err != nil {
return err
}
if i != 0 {
sql.WriteString(",")
}
fmt.Fprintf(&sql, "(%d, %t, %s, %s, %s, %d, %t)", jobW.ID, jobW.MayNeedReorg(),
strconv.Quote(job2SchemaIDs(jobW)), strconv.Quote(job2TableIDs(jobW)),
ddlutil.WrapKey2String(b), jobW.Type, jobW.Started())
}
se.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
_, err := se.Execute(ctx, sql.String(), "insert_job")
logutil.DDLLogger().Debug("add job to mysql.tidb_ddl_job table", zap.String("sql", sql.String()))
return errors.Trace(err)
}

func makeStringForIDs(ids []int64) string {
set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}

s := make([]string, 0, len(set))
for id := range set {
s = append(s, strconv.FormatInt(id, 10))
}
slices.Sort(s)
return strings.Join(s, ",")
}

func job2SchemaIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos)*2)
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.OldSchemaID, info.NewSchemaID)
}
return makeStringForIDs(ids)
case model.ActionRenameTable:
oldSchemaID := jobW.JobArgs.(*model.RenameTableArgs).OldSchemaID
ids := []int64{oldSchemaID, jobW.SchemaID}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.SchemaID, args.PTSchemaID})
default:
return strconv.FormatInt(jobW.SchemaID, 10)
}
}

func job2TableIDs(jobW *JobWrapper) string {
switch jobW.Type {
case model.ActionRenameTables:
var ids []int64
arg := jobW.JobArgs.(*model.RenameTablesArgs)
ids = make([]int64, 0, len(arg.RenameTableInfos))
for _, info := range arg.RenameTableInfos {
ids = append(ids, info.TableID)
}
return makeStringForIDs(ids)
case model.ActionExchangeTablePartition:
args := jobW.JobArgs.(*model.ExchangeTablePartitionArgs)
return makeStringForIDs([]int64{jobW.TableID, args.PTTableID})
case model.ActionTruncateTable:
newTableID := jobW.JobArgs.(*model.TruncateTableArgs).NewTableID
return strconv.FormatInt(jobW.TableID, 10) + "," + strconv.FormatInt(newTableID, 10)
default:
return strconv.FormatInt(jobW.TableID, 10)
}
}

// TODO this failpoint is only checking how job scheduler handle
// corrupted job args, we should test it there by UT, not here.
func injectModifyJobArgFailPoint(jobWs []*JobWrapper) {
Expand Down

0 comments on commit 1946f92

Please sign in to comment.