From dabfc227891a6d137d5f4fd6f9eb2d0bdeee7207 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 17 Jan 2023 21:22:48 +0800 Subject: [PATCH 1/2] ddl: fix the batch check for unique index --- ddl/index.go | 10 +++++- ddl/index_merge_tmp.go | 64 ++++++++++++++++++++++++++---------- ddl/index_merge_tmp_test.go | 65 +++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 18 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index bf4d5768514f3..49c6ca888cc4b 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1669,10 +1669,18 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC return nil }) logSlowOperations(time.Since(oprStartTime), "AddIndexBackfillDataInTxn", 3000) - + failpoint.Inject("mockDMLExecution", func(val failpoint.Value) { + //nolint:forcetypeassert + if val.(bool) && MockDMLExecution != nil { + MockDMLExecution() + } + }) return } +// MockDMLExecution is only used for test. +var MockDMLExecution func() + func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { if reorgInfo.mergingTmpIdx { logutil.BgLogger().Info("[ddl] start to merge temp index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index 012afdf055cda..ce9f3fd5c47f4 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -48,25 +48,12 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxR return errors.Trace(err) } - // 1. unique-key/primary-key is duplicate and the handle is equal, skip it. - // 2. unique-key/primary-key is duplicate and the handle is not equal, return duplicate error. - // 3. non-unique-key is duplicate, skip it. for i, key := range w.originIdxKeys { if val, found := batchVals[string(key)]; found { - if idxRecords[i].distinct && !bytes.Equal(val, idxRecords[i].vals) { - return kv.ErrKeyExists - } - if !idxRecords[i].delete { - idxRecords[i].skip = true - } else { - // Prevent deleting an unexpected index KV. - hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(val, w.table.Meta().IsCommonHandle) - if err != nil { - return errors.Trace(err) - } - if !idxRecords[i].handle.Equal(hdInVal) { - idxRecords[i].skip = true - } + // Found a value in the original index key. + err := checkTempIndexKey(txn, idxRecords[i], val, w.table) + if err != nil { + return errors.Trace(err) } } else if idxRecords[i].distinct { // The keys in w.batchCheckKeys also maybe duplicate, @@ -77,6 +64,49 @@ func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxR return nil } +func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originIdxVal []byte, tblInfo table.Table) error { + if !tmpRec.delete { + if tmpRec.distinct && !bytes.Equal(originIdxVal, tmpRec.vals) { + return kv.ErrKeyExists + } + // The key has been found in the original index, skip merging it. + tmpRec.skip = true + return nil + } + // Delete operation. + distinct := tablecodec.IndexKVIsUnique(originIdxVal) + if !distinct { + // For non-distinct key, it is consist of a null value and the handle. + // Same as the non-unique indexes, replay the delete operation on non-distinct keys. + return nil + } + // For distinct index key values, prevent deleting an unexpected index KV in original index. + hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(originIdxVal, tblInfo.Meta().IsCommonHandle) + if err != nil { + return errors.Trace(err) + } + if !tmpRec.handle.Equal(hdInVal) { + // The inequality means multiple modifications happened in the same key. + // We use the handle in origin index value to check if the row exists. + rowKey := tablecodec.EncodeRecordKey(tblInfo.RecordPrefix(), hdInVal) + _, err := txn.Get(context.Background(), rowKey) + if err != nil { + if kv.IsErrNotFound(err) { + // The row is deleted, so we can merge the delete operation to the origin index. + tmpRec.skip = false + return nil + } + // Unexpected errors. + return errors.Trace(err) + } + // Don't delete the index key if the row exists. + tmpRec.skip = true + return nil + + } + return nil +} + // temporaryIndexRecord is the record information of an index. type temporaryIndexRecord struct { vals []byte diff --git a/ddl/index_merge_tmp_test.go b/ddl/index_merge_tmp_test.go index 7ec41b786dd67..b637a55d2925f 100644 --- a/ddl/index_merge_tmp_test.go +++ b/ddl/index_merge_tmp_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/ddl/ingest" "github.com/pingcap/tidb/domain" @@ -405,6 +406,70 @@ func TestAddIndexMergeDeleteUniqueOnWriteOnly(t *testing.T) { tk.MustExec("admin check table t;") } +func TestAddIndexMergeDeleteNullUnique(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int default 0);") + tk.MustExec("insert into t values (1, 1), (2, null);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where id = 2;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustQuery("select count(1) from t;").Check(testkit.Rows("1")) + tk.MustExec("admin check table t;") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} + +func TestAddIndexMergeDoubleDelete(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(id int primary key, a int default 0);") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + d := dom.DDL() + originalCallback := d.GetHook() + defer d.SetHook(originalCallback) + callback := &ddl.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if t.Failed() { + return + } + switch job.SchemaState { + case model.StateWriteOnly: + _, err := tk1.Exec("insert into t values (1, 1);") + assert.NoError(t, err) + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + + ddl.MockDMLExecution = func() { + _, err := tk1.Exec("delete from t where id = 1;") + assert.NoError(t, err) + _, err = tk1.Exec("insert into t values (2, 1);") + assert.NoError(t, err) + _, err = tk1.Exec("delete from t where id = 2;") + assert.NoError(t, err) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockDMLExecution", "1*return(true)->return(false)")) + tk.MustExec("alter table t add unique index idx(a);") + tk.MustQuery("select count(1) from t;").Check(testkit.Rows("0")) + tk.MustExec("admin check table t;") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockDMLExecution")) +} + func TestAddIndexMergeConflictWithPessimistic(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) From ea07d83f9c05c093a193bb459b4062ddef525ab6 Mon Sep 17 00:00:00 2001 From: tangenta Date: Tue, 17 Jan 2023 21:25:36 +0800 Subject: [PATCH 2/2] remove blank line --- ddl/index_merge_tmp.go | 1 - 1 file changed, 1 deletion(-) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index ce9f3fd5c47f4..302bb6a50a620 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -102,7 +102,6 @@ func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originI // Don't delete the index key if the row exists. tmpRec.skip = true return nil - } return nil }