diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index dce469fe15321..bb7988096e730 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -179,6 +179,7 @@ go_test( "main_test.go", "modify_column_test.go", "multi_schema_change_test.go", + "mv_index_test.go", "options_test.go", "partition_test.go", "placement_policy_ddl_test.go", diff --git a/ddl/index.go b/ddl/index.go index f4e5ca8381ace..512c856faa8ef 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1216,6 +1216,7 @@ type addIndexWorker struct { // The following attributes are used to reduce memory allocation. idxKeyBufs [][]byte batchCheckKeys []kv.Key + batchCheckValues [][]byte distinctCheckFlags []bool } @@ -1468,6 +1469,7 @@ func (w *addIndexWorker) initBatchCheckBufs(batchCount int) { } w.batchCheckKeys = w.batchCheckKeys[:0] + w.batchCheckValues = w.batchCheckValues[:0] w.distinctCheckFlags = w.distinctCheckFlags[:0] } @@ -1517,16 +1519,28 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i w.initBatchCheckBufs(len(idxRecords)) stmtCtx := w.sessCtx.GetSessionVars().StmtCtx + cnt := 0 for i, record := range idxRecords { - idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i]) - if err != nil { - return errors.Trace(err) + iter := w.index.GenIndexKVIter(stmtCtx, record.vals, record.handle, idxRecords[i].rsData) + for iter.Valid() { + var buf []byte + if cnt < len(w.idxKeyBufs) { + buf = w.idxKeyBufs[cnt] + } + key, val, distinct, err := iter.Next(buf) + if err != nil { + return errors.Trace(err) + } + if cnt < len(w.idxKeyBufs) { + w.idxKeyBufs[cnt] = key + } else { + w.idxKeyBufs = append(w.idxKeyBufs, key) + } + cnt++ + w.batchCheckKeys = append(w.batchCheckKeys, key) + w.batchCheckValues = append(w.batchCheckValues, val) + w.distinctCheckFlags = append(w.distinctCheckFlags, distinct) } - // save the buffer to reduce memory allocations. - w.idxKeyBufs[i] = idxKey - - w.batchCheckKeys = append(w.batchCheckKeys, idxKey) - w.distinctCheckFlags = append(w.distinctCheckFlags, distinct) } batchVals, err := txn.BatchGet(context.Background(), w.batchCheckKeys) @@ -1548,12 +1562,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i } else if w.distinctCheckFlags[i] { // The keys in w.batchCheckKeys also maybe duplicate, // so we need to backfill the not found key into `batchVals` map. - needRsData := tables.NeedRestoredData(w.index.Meta().Columns, w.table.Meta().Columns) - val, err := tablecodec.GenIndexValuePortal(stmtCtx, w.table.Meta(), w.index.Meta(), needRsData, w.distinctCheckFlags[i], false, idxRecords[i].vals, idxRecords[i].handle, 0, idxRecords[i].rsData) - if err != nil { - return errors.Trace(err) - } - batchVals[string(key)] = val + batchVals[string(key)] = w.batchCheckValues[i] } } // Constrains is already checked. @@ -1641,19 +1650,18 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC } else { // The lightning environment is ready. vars := w.sessCtx.GetSessionVars() sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() - key, distinct, err := w.index.GenIndexKey(sCtx, idxRecord.vals, idxRecord.handle, writeBufs.IndexKeyBuf) - if err != nil { - return errors.Trace(err) - } - idxVal, err := w.index.GenIndexValue(sCtx, distinct, idxRecord.vals, idxRecord.handle, idxRecord.rsData) - if err != nil { - return errors.Trace(err) - } - err = w.writerCtx.WriteRow(key, idxVal) - if err != nil { - return errors.Trace(err) + iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData) + for iter.Valid() { + key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf) + if err != nil { + return errors.Trace(err) + } + err = w.writerCtx.WriteRow(key, idxVal) + if err != nil { + return errors.Trace(err) + } + writeBufs.IndexKeyBuf = key } - writeBufs.IndexKeyBuf = key } taskCtx.addedCount++ } diff --git a/ddl/mv_index_test.go b/ddl/mv_index_test.go new file mode 100644 index 0000000000000..964211ad76740 --- /dev/null +++ b/ddl/mv_index_test.go @@ -0,0 +1,71 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "fmt" + "strings" + "testing" + + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" +) + +func TestMultiValuedIndexOnlineDDL(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") + var sb strings.Builder + sb.WriteString("insert into t values ") + for i := 0; i < 100; i++ { + sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3)) + if i != 99 { + sb.WriteString(",") + } + } + tk.MustExec(sb.String()) + + internalTK := testkit.NewTestKit(t, store) + internalTK.MustExec("use test") + + hook := &ddl.TestDDLCallback{Do: dom} + n := 100 + hook.OnJobRunBeforeExported = func(job *model.Job) { + internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) + internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4)) + internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-3)) + n++ + } + o := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + + tk.MustExec("alter table t add index idx((cast(a as signed array)))") + tk.MustExec("admin check table t") + dom.DDL().SetHook(o) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (pk int primary key, a json);") + tk.MustExec("insert into t values (1, '[1,2,3]');") + tk.MustExec("insert into t values (2, '[2,3,4]');") + tk.MustExec("insert into t values (3, '[3,4,5]');") + tk.MustExec("insert into t values (4, '[-4,5,6]');") + tk.MustGetErrCode("alter table t add unique index idx((cast(a as signed array)));", errno.ErrDupEntry) + tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[ddl:8202]Cannot decode index value, because [types:1690]constant -4 overflows bigint") +} diff --git a/table/index.go b/table/index.go index f83e2dd35d2f4..a33b9c05f0049 100644 --- a/table/index.go +++ b/table/index.go @@ -69,6 +69,12 @@ func WithCtx(ctx context.Context) CreateIdxOptFunc { } } +// IndexIter is index kvs iter. +type IndexIter interface { + Next(kb []byte) ([]byte, []byte, bool, error) + Valid() bool +} + // Index is the interface for index data on KV store. type Index interface { // Meta returns IndexInfo. @@ -79,9 +85,11 @@ type Index interface { Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error) // Delete supports delete from statement. Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error + // GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation. + GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexIter // Exist supports check index exists or not. Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) - // GenIndexKey generates an index key. + // GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead. GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error) // GenIndexValue generates an index value. GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error) diff --git a/table/tables/index.go b/table/tables/index.go index 265fabf966f7a..29e3964959aa9 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -402,6 +402,46 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return nil } +func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter { + indexedValues := c.getIndexedValue(indexedValue) + return &indexGenerator{ + c: c, + sctx: sc, + indexedVals: indexedValues, + h: h, + handleRestoreData: handleRestoreData, + i: 0, + } +} + +type indexGenerator struct { + c *index + sctx *stmtctx.StatementContext + indexedVals [][]types.Datum + h kv.Handle + handleRestoreData []types.Datum + + i int +} + +func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { + val := s.indexedVals[s.i] + key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb) + if err != nil { + return nil, nil, false, err + } + idxVal, err := s.c.GenIndexValue(s.sctx, distinct, val, s.h, s.handleRestoreData) + if err != nil { + return nil, nil, false, err + } + s.i++ + return key, idxVal, distinct, err +} + +func (s *indexGenerator) Valid() bool { + return s.i < len(s.indexedVals) +} + const ( // TempIndexKeyTypeNone means the key is not a temporary index key. TempIndexKeyTypeNone byte = 0 diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 352dc83a1d1a2..6843ef6149749 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -137,6 +137,54 @@ func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) { require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } +func TestIngestMVIndexOnPartitionTable(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") + var sb strings.Builder + sb.WriteString("insert into t values ") + for i := 0; i < 10240; i++ { + sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3)) + if i != 10240-1 { + sb.WriteString(",") + } + } + tk.MustExec(sb.String()) + tk.MustExec("alter table t add index idx((cast(a as signed array)));") + rows := tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rows, 1) + jobTp := rows[0][3].(string) + require.True(t, strings.Contains(jobTp, "ingest"), jobTp) + tk.MustExec("admin check table t") + + tk.MustExec("drop table t") + tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") + tk.MustExec(sb.String()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + n := 10240 + internalTK := testkit.NewTestKit(t, store) + internalTK.MustExec("use addindexlit;") + + for i := 0; i < 1024; i++ { + internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) + internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-10)) + internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-5)) + n++ + } + wg.Done() + }() + tk.MustExec("alter table t add index idx((cast(a as signed array)));") + wg.Wait() + tk.MustExec("admin check table t") +} + func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store)