Skip to content

Commit

Permalink
*: update init DDL table
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Dec 8, 2022
1 parent 6e1cc7a commit f0c9342
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 85 deletions.
78 changes: 44 additions & 34 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,6 @@ const (
MaxGlobalID = MaxInt48 - 1000
)

// DDLTableVersion indicates the DDL table version type.
type DDLTableVersion int

const (
// InitDDLTableVersion is the original version.
InitDDLTableVersion DDLTableVersion = 0
// BaseDDLTableVersion is for support concurrent DDL, it added tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history.
BaseDDLTableVersion DDLTableVersion = 1
// MDLTableVersion is for support MDL tables.
MDLTableVersion DDLTableVersion = 2
// BackfillTableVersion is for support distributed reorg stage, it added tidb_ddl_backfill, tidb_ddl_backfill_history.
BackfillTableVersion DDLTableVersion = 3
)

// Bytes returns the byte slice.
func (ver DDLTableVersion) Bytes() []byte {
return []byte(strconv.Itoa(int(ver)))
}

var (
// ErrDBExists is the error for db exists.
ErrDBExists = dbterror.ClassMeta.NewStd(mysql.ErrDBCreateExists)
Expand Down Expand Up @@ -575,6 +556,18 @@ func (m *Meta) CreateTableOrView(dbID int64, tableInfo *model.TableInfo) error {
return m.txn.HSet(dbKey, tableKey, data)
}

// SetDDLTables write a key into storage.
func (m *Meta) SetDDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("1"))
return errors.Trace(err)
}

// SetMDLTables write a key into storage.
func (m *Meta) SetMDLTables() error {
err := m.txn.Set(mDDLTableVersion, []byte("2"))
return errors.Trace(err)
}

// CreateMySQLDatabaseIfNotExists creates mysql schema and return its DB ID.
func (m *Meta) CreateMySQLDatabaseIfNotExists() (int64, error) {
id, err := m.GetSystemDBID()
Expand Down Expand Up @@ -611,26 +604,22 @@ func (m *Meta) GetSystemDBID() (int64, error) {
return 0, nil
}

// SetDDLTables write a key into storage.
func (m *Meta) SetDDLTables(ddlTableVersion DDLTableVersion) error {
err := m.txn.Set(mDDLTableVersion, ddlTableVersion.Bytes())
return errors.Trace(err)
}

// CheckDDLTableVersion check if the tables related to concurrent DDL exists.
func (m *Meta) CheckDDLTableVersion() (DDLTableVersion, error) {
// CheckDDLTableExists check if the tables related to concurrent DDL exists.
func (m *Meta) CheckDDLTableExists() (bool, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return -1, errors.Trace(err)
}
if string(v) == "" {
return InitDDLTableVersion, nil
return false, errors.Trace(err)
}
ver, err := strconv.Atoi(string(v))
return len(v) != 0, nil
}

// CheckMDLTableExists check if the tables related to concurrent DDL exists.
func (m *Meta) CheckMDLTableExists() (bool, error) {
v, err := m.txn.Get(mDDLTableVersion)
if err != nil {
return -1, errors.Trace(err)
return false, errors.Trace(err)
}
return DDLTableVersion(ver), nil
return bytes.Equal(v, []byte("2")), nil
}

// SetConcurrentDDL set the concurrent DDL flag.
Expand Down Expand Up @@ -965,6 +954,27 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
return tableInfo, errors.Trace(err)
}

// CheckTableExists checks if the table is existed with dbID and tableID.
func (m *Meta) CheckTableExists(dbID int64, tableID int64) (bool, error) {
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return false, errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
v, err := m.txn.HGet(dbKey, tableKey)
if err != nil {
return false, errors.Trace(err)
}
if v != nil {
return true, nil
}

return false, nil
}

// DDL job structure
// DDLJobList: list jobs
// DDLJobHistory: hash
Expand Down
27 changes: 12 additions & 15 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,16 @@ func TestMeta(t *testing.T) {
table, err := m.GetTable(1, 1)
require.NoError(t, err)
require.Equal(t, tbInfo, table)
tblExist, err := m.CheckTableExists(1, 1)
require.NoError(t, err)
require.Equal(t, true, tblExist)

table, err = m.GetTable(1, 2)
require.NoError(t, err)
require.Nil(t, table)
tblExist, err = m.CheckTableExists(1, 2)
require.NoError(t, err)
require.Equal(t, false, tblExist)

tbInfo2 := &model.TableInfo{
ID: 2,
Expand Down Expand Up @@ -842,25 +848,16 @@ func TestDDLTable(t *testing.T) {

m := meta.NewMeta(txn)

ver, err := m.CheckDDLTableVersion()
exists, err := m.CheckDDLTableExists()
require.NoError(t, err)
require.Equal(t, meta.InitDDLTableVersion, ver)
require.False(t, exists)

err = m.SetDDLTables(meta.BaseDDLTableVersion)
require.NoError(t, err)
ver, err = m.CheckDDLTableVersion()
require.NoError(t, err)
require.Equal(t, meta.BaseDDLTableVersion, ver)
err = m.SetDDLTables(meta.MDLTableVersion)
err = m.SetDDLTables()
require.NoError(t, err)
ver, err = m.CheckDDLTableVersion()
require.NoError(t, err)
require.Equal(t, meta.MDLTableVersion, ver)
err = m.SetDDLTables(meta.BackfillTableVersion)
require.NoError(t, err)
ver, err = m.CheckDDLTableVersion()

exists, err = m.CheckDDLTableExists()
require.NoError(t, err)
require.Equal(t, meta.BackfillTableVersion, ver)
require.True(t, exists)

err = m.SetConcurrentDDL(true)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ const (

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version108
var currentBootstrapVersion int64 = version107

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down
6 changes: 1 addition & 5 deletions session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,7 @@ func TestBootstrapWithError(t *testing.T) {
se.txn.init()
se.mu.values = make(map[fmt.Stringer]interface{})
se.SetValue(sessionctx.Initing, true)
err := InitDDLJobTables(store, meta.BaseDDLTableVersion)
require.NoError(t, err)
err = InitMDLTable(store)
require.NoError(t, err)
err = InitDDLJobTables(store, meta.BackfillTableVersion)
err := InitDDLJobTables(store)
require.NoError(t, err)
dom, err := domap.Get(store)
require.NoError(t, err)
Expand Down
56 changes: 27 additions & 29 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ import (
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tipb/go-binlog"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -2778,17 +2777,17 @@ func loadCollationParameter(ctx context.Context, se *session) (bool, error) {

var (
errResultIsEmpty = dbterror.ClassExecutor.NewStd(errno.ErrResultIsEmpty)
// BaseDDLJobTableVer is a list of tables definitions used in concurrent DDL.
BaseDDLJobTableVer = []struct {
// DDLJobTables is a list of tables definitions used in concurrent DDL.
DDLJobTables = []struct {
SQL string
id int64
}{
{ddl.JobTableSQL, ddl.JobTableID},
{ddl.ReorgTableSQL, ddl.ReorgTableID},
{ddl.HistoryTableSQL, ddl.HistoryTableID},
}
// BackfillTableVer is a list of tables definitions used in dist reorg DDL.
BackfillTableVer = []struct {
// BackfillTables is a list of tables definitions used in dist reorg DDL.
BackfillTables = []struct {
SQL string
id int64
}{
Expand All @@ -2812,29 +2811,33 @@ func splitAndScatterTable(store kv.Storage, tableIDs []int64) {
}
}

// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history, or tidb_ddl_backfill and tidb_ddl_backfill_history.
func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error {
targetTables := BaseDDLJobTableVer
if targetVer == meta.BackfillTableVersion {
targetTables = BackfillTableVer
}
// InitDDLJobTables is to create tidb_ddl_job, tidb_ddl_reorg and tidb_ddl_history.
func InitDDLJobTables(store kv.Storage) error {
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
tableVer, err := t.CheckDDLTableVersion()
if err != nil || tableVer >= targetVer {
exists, err := t.CheckDDLTableExists()
if err != nil {
return errors.Trace(err)
}
dbID, err := t.CreateMySQLDatabaseIfNotExists()
if err != nil {
return err
}
tableIDs := make([]int64, 0, len(targetTables))
p := parser.New()
for _, tbl := range targetTables {
logutil.BgLogger().Info("init DDL job tables",
zap.Int64("tbl id", tbl.id), zap.ByteString("tbl version", tableVer.Bytes()),
zap.ByteString("target tbl version", targetVer.Bytes()), zap.String("sql", tbl.SQL))
tables := append(DDLJobTables, BackfillTables...)
if exists {
tblExist, err := t.CheckTableExists(dbID, BackfillTables[0].id)
if err != nil || tblExist {
return errors.Trace(err)
}
tables = BackfillTables
}
tableIDs := make([]int64, 0, len(tables))
for _, tbl := range tables {
tableIDs = append(tableIDs, tbl.id)
}
splitAndScatterTable(store, tableIDs)
p := parser.New()
for _, tbl := range tables {
stmt, err := p.ParseOneStmt(tbl.SQL, "", "")
if err != nil {
return errors.Trace(err)
Expand All @@ -2851,17 +2854,16 @@ func InitDDLJobTables(store kv.Storage, targetVer meta.DDLTableVersion) error {
return errors.Trace(err)
}
}
splitAndScatterTable(store, tableIDs)
return t.SetDDLTables(targetVer)
return t.SetDDLTables()
})
}

// InitMDLTable is to create tidb_mdl_info, which is used for metadata lock.
func InitMDLTable(store kv.Storage) error {
return kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
tableVer, err := t.CheckDDLTableVersion()
if err != nil || tableVer >= meta.MDLTableVersion {
exists, err := t.CheckMDLTableExists()
if err != nil || exists {
return errors.Trace(err)
}
dbID, err := t.CreateMySQLDatabaseIfNotExists()
Expand All @@ -2886,7 +2888,7 @@ func InitMDLTable(store kv.Storage) error {
return errors.Trace(err)
}

return t.SetDDLTables(meta.MDLTableVersion)
return t.SetMDLTables()
})
}

Expand Down Expand Up @@ -2964,18 +2966,14 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}
}
err := InitDDLJobTables(store, meta.BaseDDLTableVersion)
err := InitDDLJobTables(store)
if err != nil {
return nil, err
}
err = InitMDLTable(store)
if err != nil {
return nil, err
}
err = InitDDLJobTables(store, meta.BackfillTableVersion)
if err != nil {
return nil, err
}
ver := getStoreBootstrapVersion(store)
if ver == notBootstrapped {
runInBootstrapSession(store, bootstrap)
Expand Down
2 changes: 1 addition & 1 deletion session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestInitMetaTable(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
for _, sql := range session.BaseDDLJobTableVer {
for _, sql := range session.DDLJobTables {
tk.MustExec(sql.SQL)
}

Expand Down

0 comments on commit f0c9342

Please sign in to comment.