Skip to content

Commit

Permalink
ddl: mark the writes from delete-only and drop them on merge (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta authored Dec 10, 2022
1 parent 41ac035 commit 4f93081
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 3 deletions.
6 changes: 6 additions & 0 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
job.Args = []interface{}{indexInfo.ID, false /*if exists*/, getPartitionIDs(tbl.Meta())}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", tblInfo.State)
}
Expand Down Expand Up @@ -960,6 +963,9 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
ingest.LitBackCtxMgr.Unregister(job.ID)
}
job.Args[0] = indexInfo.ID
} else {
// the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility,
Expand Down
5 changes: 3 additions & 2 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
unique := false
length := len(rawValue)
keyVer := rawValue[length-1]
if keyVer == tables.TempIndexKeyTypeMerge {
// The kv is written in the merging state. It has been written to the origin index, we can skip it.
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
rawValue = rawValue[:length-1]
Expand Down
45 changes: 45 additions & 0 deletions ddl/index_merge_tmp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,48 @@ func TestCreateUniqueIndexKeyExist(t *testing.T) {
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "1 7", "2 7", "5 7", "8 8", "10 10"))
}

func TestAddIndexMergeIndexUpdateOnDeleteOnly(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec(`CREATE TABLE t (a DATE NULL DEFAULT '1619-01-18', b BOOL NULL DEFAULT '0') CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_bin';`)
tk.MustExec(`INSERT INTO t SET b = '1';`)

updateSQLs := []string{
"UPDATE t SET a = '9432-05-10', b = '0';",
"UPDATE t SET a = '9432-05-10', b = '1';",
}

// Force onCreateIndex use the txn-merge process.
ingest.LitInitialized = false
tk.MustExec("set @@global.tidb_ddl_enable_fast_reorg = 1;")
tk.MustExec("set @@global.tidb_enable_mutation_checker = 1;")
tk.MustExec("set @@global.tidb_txn_assertion_level = 'STRICT';")

var checkErrs []error
originHook := dom.DDL().GetHook()
callback := &ddl.TestDDLCallback{
Do: dom,
}
onJobUpdatedBefore := func(job *model.Job) {
if job.SchemaState == model.StateDeleteOnly {
for _, sql := range updateSQLs {
_, err := tk2.Exec(sql)
if err != nil {
checkErrs = append(checkErrs, err)
}
}
}
}
callback.OnJobUpdatedExported.Store(&onJobUpdatedBefore)
dom.DDL().SetHook(callback)
tk.MustExec("alter table t add index idx(b);")
dom.DDL().SetHook(originHook)
for _, err := range checkErrs {
require.NoError(t, err)
}
tk.MustExec("admin check table t;")
}
7 changes: 6 additions & 1 deletion table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
)
if !opt.FromBackFill {
key, tempKey, keyVer = GenTempIdxKeyByState(c.idxInfo, key)
if keyVer == TempIndexKeyTypeBackfill {
if keyVer == TempIndexKeyTypeBackfill || keyVer == TempIndexKeyTypeDelete {
key, tempKey = tempKey, nil
keyIsTempIdxKey = true
}
Expand Down Expand Up @@ -347,6 +347,8 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
// TempIndexKeyTypeDelete indicates this value is written in the delete-only stage.
TempIndexKeyTypeDelete byte = 'd'
// TempIndexKeyTypeBackfill indicates this value is written in the backfill stage.
TempIndexKeyTypeBackfill byte = 'b'
// TempIndexKeyTypeMerge indicates this value is written in the merge stage.
Expand All @@ -363,6 +365,9 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
case model.BackfillStateRunning:
// Write to the temporary index.
tablecodec.IndexKey2TempIndexKey(indexInfo.ID, indexKey)
if indexInfo.State == model.StateDeleteOnly {
return nil, indexKey, TempIndexKeyTypeDelete
}
return nil, indexKey, TempIndexKeyTypeBackfill
case model.BackfillStateReadyToMerge, model.BackfillStateMerging:
// Double write
Expand Down

0 comments on commit 4f93081

Please sign in to comment.