From 7019104fc80097691835ffc9e98a60ee8167773b Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Wed, 4 Jan 2023 13:39:48 +0800 Subject: [PATCH 1/7] add test Signed-off-by: xiongjiwei --- ddl/BUILD.bazel | 1 + ddl/mv_index_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) create mode 100644 ddl/mv_index_test.go diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index dce469fe15321..bb7988096e730 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -179,6 +179,7 @@ go_test( "main_test.go", "modify_column_test.go", "multi_schema_change_test.go", + "mv_index_test.go", "options_test.go", "partition_test.go", "placement_policy_ddl_test.go", diff --git a/ddl/mv_index_test.go b/ddl/mv_index_test.go new file mode 100644 index 0000000000000..c74a5d1f9049a --- /dev/null +++ b/ddl/mv_index_test.go @@ -0,0 +1,58 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl_test + +import ( + "fmt" + "testing" + + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/testkit" +) + +func TestMultiValuedIndexOnlineDDL(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + for _, v := range []int{0, 1} { + tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_enable_fast_reorg=%d", v)) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (pk int primary key, a json)") + tk.MustExec("insert into t values (1, '[1,2,3]')") + tk.MustExec("insert into t values (2, '[2,3,4]')") + tk.MustExec("insert into t values (3, '[3,4,5]')") + tk.MustExec("insert into t values (4, '[4,5,6]')") + + internalTK := testkit.NewTestKit(t, store) + internalTK.MustExec("use test") + + hook := &ddl.TestDDLCallback{Do: dom} + n := 5 + hook.OnJobRunBeforeExported = func(job *model.Job) { + internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) + internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4)) + internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+100, n-3)) + n++ + } + o := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + + tk.MustExec("alter table t add index idx((cast(a as signed array)))") + tk.MustExec("admin check table t") + dom.DDL().SetHook(o) + } +} From f181e3a2b30dde81b99e6b95dafea3f77035ba20 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Wed, 4 Jan 2023 18:39:03 +0800 Subject: [PATCH 2/7] support online ddl Signed-off-by: xiongjiwei --- ddl/index.go | 60 ++++++++++++++++++++++++------------------- ddl/mv_index_test.go | 59 +++++++++++++++++++++++------------------- table/index.go | 10 +++++++- table/tables/index.go | 46 ++++++++++++++++++++++++++++++--- 4 files changed, 119 insertions(+), 56 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index f4e5ca8381ace..25c2383f85251 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1216,6 +1216,7 @@ type addIndexWorker struct { // The following attributes are used to reduce memory allocation. idxKeyBufs [][]byte batchCheckKeys []kv.Key + batchCheckValues [][]byte distinctCheckFlags []bool } @@ -1468,6 +1469,7 @@ func (w *addIndexWorker) initBatchCheckBufs(batchCount int) { } w.batchCheckKeys = w.batchCheckKeys[:0] + w.batchCheckValues = w.batchCheckValues[:0] w.distinctCheckFlags = w.distinctCheckFlags[:0] } @@ -1517,16 +1519,28 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i w.initBatchCheckBufs(len(idxRecords)) stmtCtx := w.sessCtx.GetSessionVars().StmtCtx + cnt := 0 for i, record := range idxRecords { - idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i]) - if err != nil { - return errors.Trace(err) + iter := w.index.GenIndexKVIter(stmtCtx, record.vals, record.handle, idxRecords[i].rsData, false) + for iter.Valid() { + var buf []byte + if cnt < len(w.idxKeyBufs) { + buf = w.idxKeyBufs[cnt] + } + key, val, distinct, err := iter.Next(buf) + if err != nil { + return errors.Trace(err) + } + if cnt < len(w.idxKeyBufs) { + w.idxKeyBufs[cnt] = key + } else { + w.idxKeyBufs = append(w.idxKeyBufs, key) + } + cnt++ + w.batchCheckKeys = append(w.batchCheckKeys, key) + w.batchCheckValues = append(w.batchCheckValues, val) + w.distinctCheckFlags = append(w.distinctCheckFlags, distinct) } - // save the buffer to reduce memory allocations. - w.idxKeyBufs[i] = idxKey - - w.batchCheckKeys = append(w.batchCheckKeys, idxKey) - w.distinctCheckFlags = append(w.distinctCheckFlags, distinct) } batchVals, err := txn.BatchGet(context.Background(), w.batchCheckKeys) @@ -1548,12 +1562,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i } else if w.distinctCheckFlags[i] { // The keys in w.batchCheckKeys also maybe duplicate, // so we need to backfill the not found key into `batchVals` map. - needRsData := tables.NeedRestoredData(w.index.Meta().Columns, w.table.Meta().Columns) - val, err := tablecodec.GenIndexValuePortal(stmtCtx, w.table.Meta(), w.index.Meta(), needRsData, w.distinctCheckFlags[i], false, idxRecords[i].vals, idxRecords[i].handle, 0, idxRecords[i].rsData) - if err != nil { - return errors.Trace(err) - } - batchVals[string(key)] = val + batchVals[string(key)] = w.batchCheckValues[i] } } // Constrains is already checked. @@ -1641,19 +1650,18 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC } else { // The lightning environment is ready. vars := w.sessCtx.GetSessionVars() sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() - key, distinct, err := w.index.GenIndexKey(sCtx, idxRecord.vals, idxRecord.handle, writeBufs.IndexKeyBuf) - if err != nil { - return errors.Trace(err) - } - idxVal, err := w.index.GenIndexValue(sCtx, distinct, idxRecord.vals, idxRecord.handle, idxRecord.rsData) - if err != nil { - return errors.Trace(err) - } - err = w.writerCtx.WriteRow(key, idxVal) - if err != nil { - return errors.Trace(err) + iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData, false) + for iter.Valid() { + key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf) + if err != nil { + return errors.Trace(err) + } + err = w.writerCtx.WriteRow(key, idxVal) + if err != nil { + return errors.Trace(err) + } + writeBufs.IndexKeyBuf = key } - writeBufs.IndexKeyBuf = key } taskCtx.addedCount++ } diff --git a/ddl/mv_index_test.go b/ddl/mv_index_test.go index c74a5d1f9049a..739e2cecac7aa 100644 --- a/ddl/mv_index_test.go +++ b/ddl/mv_index_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" ) @@ -28,31 +29,37 @@ func TestMultiValuedIndexOnlineDDL(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - for _, v := range []int{0, 1} { - tk.MustExec(fmt.Sprintf("set @@global.tidb_ddl_enable_fast_reorg=%d", v)) - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (pk int primary key, a json)") - tk.MustExec("insert into t values (1, '[1,2,3]')") - tk.MustExec("insert into t values (2, '[2,3,4]')") - tk.MustExec("insert into t values (3, '[3,4,5]')") - tk.MustExec("insert into t values (4, '[4,5,6]')") - - internalTK := testkit.NewTestKit(t, store) - internalTK.MustExec("use test") - - hook := &ddl.TestDDLCallback{Do: dom} - n := 5 - hook.OnJobRunBeforeExported = func(job *model.Job) { - internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) - internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4)) - internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+100, n-3)) - n++ - } - o := dom.DDL().GetHook() - dom.DDL().SetHook(hook) - - tk.MustExec("alter table t add index idx((cast(a as signed array)))") - tk.MustExec("admin check table t") - dom.DDL().SetHook(o) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (pk int primary key, a json)") + tk.MustExec("insert into t values (1, '[1,2,3]')") + tk.MustExec("insert into t values (2, '[2,3,4]')") + tk.MustExec("insert into t values (3, '[3,4,5]')") + tk.MustExec("insert into t values (4, '[4,5,6]')") + + internalTK := testkit.NewTestKit(t, store) + internalTK.MustExec("use test") + + hook := &ddl.TestDDLCallback{Do: dom} + n := 5 + hook.OnJobRunBeforeExported = func(job *model.Job) { + internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) + internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4)) + internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+100, n-3)) + n++ } + o := dom.DDL().GetHook() + dom.DDL().SetHook(hook) + + tk.MustExec("alter table t add index idx((cast(a as signed array)))") + tk.MustExec("admin check table t") + dom.DDL().SetHook(o) + + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t (pk int primary key, a json);") + tk.MustExec("insert into t values (1, '[1,2,3]');") + tk.MustExec("insert into t values (2, '[2,3,4]');") + tk.MustExec("insert into t values (3, '[3,4,5]');") + tk.MustExec("insert into t values (4, '[-4,5,6]');") + tk.MustGetErrCode("alter table t add unique index idx((cast(a as signed array)));", errno.ErrDupEntry) + tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[ddl:8202]Cannot decode index value, because [types:1690]constant -4 overflows bigint") } diff --git a/table/index.go b/table/index.go index f83e2dd35d2f4..22c5db22b80a1 100644 --- a/table/index.go +++ b/table/index.go @@ -69,6 +69,12 @@ func WithCtx(ctx context.Context) CreateIdxOptFunc { } } +// IndexIter is index kvs iter. +type IndexIter interface { + Next(kb []byte) ([]byte, []byte, bool, error) + Valid() bool +} + // Index is the interface for index data on KV store. type Index interface { // Meta returns IndexInfo. @@ -79,9 +85,11 @@ type Index interface { Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error) // Delete supports delete from statement. Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error + // GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation. + GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, untouched bool) IndexIter // Exist supports check index exists or not. Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) - // GenIndexKey generates an index key. + // GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead. GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error) // GenIndexValue generates an index value. GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error) diff --git a/table/tables/index.go b/table/tables/index.go index 265fabf966f7a..212623b5433e6 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -402,6 +402,46 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return nil } +func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, untouched bool) table.IndexIter { + indexedValues := c.getIndexedValue(indexedValue) + return &indexGenerator{ + c: c, + sctx: sc, + indexedVals: indexedValues, + h: h, + handleRestoreData: handleRestoreData, + i: 0, + } +} + +type indexGenerator struct { + c *index + sctx *stmtctx.StatementContext + indexedVals [][]types.Datum + h kv.Handle + handleRestoreData []types.Datum + + i int +} + +func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) { + val := s.indexedVals[s.i] + key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb) + if err != nil { + return nil, nil, false, err + } + idxVal, err := s.c.GenIndexValue(s.sctx, distinct, val, s.h, s.handleRestoreData) + if err != nil { + return nil, nil, false, err + } + s.i++ + return key, idxVal, distinct, err +} + +func (s *indexGenerator) Valid() bool { + return s.i < len(s.indexedVals) +} + const ( // TempIndexKeyTypeNone means the key is not a temporary index key. TempIndexKeyTypeNone byte = 0 @@ -534,7 +574,7 @@ func BuildRowcodecColInfoForIndexColumns(idxInfo *model.IndexInfo, tblInfo *mode colInfo = append(colInfo, rowcodec.ColInfo{ ID: col.ID, IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.GetFlag()), - Ft: rowcodec.FieldTypeFromModelColumn(col), + Ft: rowcodec.FieldTypeFromModelColumn(col).ArrayType(), }) } return colInfo @@ -545,7 +585,7 @@ func BuildFieldTypesForIndexColumns(idxInfo *model.IndexInfo, tblInfo *model.Tab tps := make([]*types.FieldType, 0, len(idxInfo.Columns)) for _, idxCol := range idxInfo.Columns { col := tblInfo.Columns[idxCol.Offset] - tps = append(tps, rowcodec.FieldTypeFromModelColumn(col)) + tps = append(tps, rowcodec.FieldTypeFromModelColumn(col).ArrayType()) } return tps } @@ -560,7 +600,7 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * col := tblInfo.Columns[idxCol.Offset] colInfo = append(colInfo, rowcodec.ColInfo{ ID: col.ID, - Ft: rowcodec.FieldTypeFromModelColumn(col), + Ft: rowcodec.FieldTypeFromModelColumn(col).ArrayType(), }) } } From 5f0ee8ce3191926d2a1a5232305c334f9c6342e0 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 5 Jan 2023 10:50:53 +0800 Subject: [PATCH 3/7] add realtikv test Signed-off-by: xiongjiwei --- ddl/mv_index_test.go | 20 +++++++++------ .../addindextest/integration_test.go | 25 +++++++++++++++++++ 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/ddl/mv_index_test.go b/ddl/mv_index_test.go index 739e2cecac7aa..964211ad76740 100644 --- a/ddl/mv_index_test.go +++ b/ddl/mv_index_test.go @@ -16,6 +16,7 @@ package ddl_test import ( "fmt" + "strings" "testing" "github.com/pingcap/tidb/ddl" @@ -30,21 +31,26 @@ func TestMultiValuedIndexOnlineDDL(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") - tk.MustExec("create table t (pk int primary key, a json)") - tk.MustExec("insert into t values (1, '[1,2,3]')") - tk.MustExec("insert into t values (2, '[2,3,4]')") - tk.MustExec("insert into t values (3, '[3,4,5]')") - tk.MustExec("insert into t values (4, '[4,5,6]')") + tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") + var sb strings.Builder + sb.WriteString("insert into t values ") + for i := 0; i < 100; i++ { + sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3)) + if i != 99 { + sb.WriteString(",") + } + } + tk.MustExec(sb.String()) internalTK := testkit.NewTestKit(t, store) internalTK.MustExec("use test") hook := &ddl.TestDDLCallback{Do: dom} - n := 5 + n := 100 hook.OnJobRunBeforeExported = func(job *model.Job) { internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4)) - internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+100, n-3)) + internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-3)) n++ } o := dom.DDL().GetHook() diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 352dc83a1d1a2..b7bcf1aa0f2e8 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -137,6 +137,31 @@ func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) { require.True(t, strings.Contains(jobTp, "ingest"), jobTp) } +func TestIngestMVIndexOnPartitionTable(t *testing.T) { + store := realtikvtest.CreateMockStoreAndSetup(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("drop database if exists addindexlit;") + tk.MustExec("create database addindexlit;") + tk.MustExec("use addindexlit;") + tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`) + + tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") + var sb strings.Builder + sb.WriteString("insert into t values ") + for i := 0; i < 100; i++ { + sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3)) + if i != 99 { + sb.WriteString(",") + } + } + tk.MustExec(sb.String()) + tk.MustExec("alter table t add index idx((cast(a as signed array)));") + rows := tk.MustQuery("admin show ddl jobs 1;").Rows() + require.Len(t, rows, 1) + jobTp := rows[0][3].(string) + require.True(t, strings.Contains(jobTp, "ingest"), jobTp) +} + func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { store := realtikvtest.CreateMockStoreAndSetup(t) tk := testkit.NewTestKit(t, store) From bdb0b00515b95e31ea88e1e96a61d35d515a8c45 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 5 Jan 2023 14:23:10 +0800 Subject: [PATCH 4/7] revert Signed-off-by: xiongjiwei --- table/tables/index.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/table/tables/index.go b/table/tables/index.go index 212623b5433e6..4e7ad545fd4ed 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -574,7 +574,7 @@ func BuildRowcodecColInfoForIndexColumns(idxInfo *model.IndexInfo, tblInfo *mode colInfo = append(colInfo, rowcodec.ColInfo{ ID: col.ID, IsPKHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.GetFlag()), - Ft: rowcodec.FieldTypeFromModelColumn(col).ArrayType(), + Ft: rowcodec.FieldTypeFromModelColumn(col), }) } return colInfo @@ -585,7 +585,7 @@ func BuildFieldTypesForIndexColumns(idxInfo *model.IndexInfo, tblInfo *model.Tab tps := make([]*types.FieldType, 0, len(idxInfo.Columns)) for _, idxCol := range idxInfo.Columns { col := tblInfo.Columns[idxCol.Offset] - tps = append(tps, rowcodec.FieldTypeFromModelColumn(col).ArrayType()) + tps = append(tps, rowcodec.FieldTypeFromModelColumn(col)) } return tps } @@ -600,7 +600,7 @@ func TryAppendCommonHandleRowcodecColInfos(colInfo []rowcodec.ColInfo, tblInfo * col := tblInfo.Columns[idxCol.Offset] colInfo = append(colInfo, rowcodec.ColInfo{ ID: col.ID, - Ft: rowcodec.FieldTypeFromModelColumn(col).ArrayType(), + Ft: rowcodec.FieldTypeFromModelColumn(col), }) } } From b7e440d359704ab060b53a093ec9fb469cdb383b Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 5 Jan 2023 14:35:13 +0800 Subject: [PATCH 5/7] add workload for realtikv Signed-off-by: xiongjiwei --- .../addindextest/integration_test.go | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index b7bcf1aa0f2e8..6625af4ed0c44 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -148,7 +148,7 @@ func TestIngestMVIndexOnPartitionTable(t *testing.T) { tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") var sb strings.Builder sb.WriteString("insert into t values ") - for i := 0; i < 100; i++ { + for i := 0; i < 10240; i++ { sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3)) if i != 99 { sb.WriteString(",") @@ -160,6 +160,29 @@ func TestIngestMVIndexOnPartitionTable(t *testing.T) { require.Len(t, rows, 1) jobTp := rows[0][3].(string) require.True(t, strings.Contains(jobTp, "ingest"), jobTp) + tk.MustExec("admin check table t") + + tk.MustExec("drop table t") + tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;") + tk.MustExec(sb.String()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + n := 10240 + internalTK := testkit.NewTestKit(t, store) + internalTK.MustExec("use addindexlit;") + + for i := 0; i < 1024; i++ { + internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2)) + internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-10)) + internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-5)) + n++ + } + wg.Done() + }() + tk.MustExec("alter table t add index idx((cast(a as signed array)));") + wg.Wait() + tk.MustExec("admin check table t") } func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) { From deb345d2494eff4e17996b38359eab6713cab0df Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Thu, 5 Jan 2023 15:37:00 +0800 Subject: [PATCH 6/7] fix test Signed-off-by: xiongjiwei --- tests/realtikvtest/addindextest/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/realtikvtest/addindextest/integration_test.go b/tests/realtikvtest/addindextest/integration_test.go index 6625af4ed0c44..6843ef6149749 100644 --- a/tests/realtikvtest/addindextest/integration_test.go +++ b/tests/realtikvtest/addindextest/integration_test.go @@ -150,7 +150,7 @@ func TestIngestMVIndexOnPartitionTable(t *testing.T) { sb.WriteString("insert into t values ") for i := 0; i < 10240; i++ { sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3)) - if i != 99 { + if i != 10240-1 { sb.WriteString(",") } } From d51007efa470f763f62ec7956bc9c4865636d270 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Fri, 6 Jan 2023 13:36:05 +0800 Subject: [PATCH 7/7] remove unused parameter Signed-off-by: xiongjiwei --- ddl/index.go | 4 ++-- table/index.go | 2 +- table/tables/index.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 25c2383f85251..512c856faa8ef 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -1521,7 +1521,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i stmtCtx := w.sessCtx.GetSessionVars().StmtCtx cnt := 0 for i, record := range idxRecords { - iter := w.index.GenIndexKVIter(stmtCtx, record.vals, record.handle, idxRecords[i].rsData, false) + iter := w.index.GenIndexKVIter(stmtCtx, record.vals, record.handle, idxRecords[i].rsData) for iter.Valid() { var buf []byte if cnt < len(w.idxKeyBufs) { @@ -1650,7 +1650,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC } else { // The lightning environment is ready. vars := w.sessCtx.GetSessionVars() sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() - iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData, false) + iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData) for iter.Valid() { key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf) if err != nil { diff --git a/table/index.go b/table/index.go index 22c5db22b80a1..a33b9c05f0049 100644 --- a/table/index.go +++ b/table/index.go @@ -86,7 +86,7 @@ type Index interface { // Delete supports delete from statement. Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error // GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation. - GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, untouched bool) IndexIter + GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexIter // Exist supports check index exists or not. Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error) // GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead. diff --git a/table/tables/index.go b/table/tables/index.go index 4e7ad545fd4ed..29e3964959aa9 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -402,7 +402,7 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed return nil } -func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, untouched bool) table.IndexIter { +func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter { indexedValues := c.getIndexedValue(indexedValue) return &indexGenerator{ c: c,