Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support online create multi-valued index #40304

Merged
merged 10 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ go_test(
"main_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
"mv_index_test.go",
"options_test.go",
"partition_test.go",
"placement_policy_ddl_test.go",
Expand Down
60 changes: 34 additions & 26 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,7 @@ type addIndexWorker struct {
// The following attributes are used to reduce memory allocation.
idxKeyBufs [][]byte
batchCheckKeys []kv.Key
batchCheckValues [][]byte
distinctCheckFlags []bool
}

Expand Down Expand Up @@ -1468,6 +1469,7 @@ func (w *addIndexWorker) initBatchCheckBufs(batchCount int) {
}

w.batchCheckKeys = w.batchCheckKeys[:0]
w.batchCheckValues = w.batchCheckValues[:0]
w.distinctCheckFlags = w.distinctCheckFlags[:0]
}

Expand Down Expand Up @@ -1517,16 +1519,28 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i

w.initBatchCheckBufs(len(idxRecords))
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
cnt := 0
for i, record := range idxRecords {
idxKey, distinct, err := w.index.GenIndexKey(stmtCtx, record.vals, record.handle, w.idxKeyBufs[i])
if err != nil {
return errors.Trace(err)
iter := w.index.GenIndexKVIter(stmtCtx, record.vals, record.handle, idxRecords[i].rsData)
for iter.Valid() {
var buf []byte
if cnt < len(w.idxKeyBufs) {
buf = w.idxKeyBufs[cnt]
}
key, val, distinct, err := iter.Next(buf)
if err != nil {
return errors.Trace(err)
}
if cnt < len(w.idxKeyBufs) {
w.idxKeyBufs[cnt] = key
} else {
w.idxKeyBufs = append(w.idxKeyBufs, key)
}
cnt++
w.batchCheckKeys = append(w.batchCheckKeys, key)
w.batchCheckValues = append(w.batchCheckValues, val)
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}
// save the buffer to reduce memory allocations.
w.idxKeyBufs[i] = idxKey

w.batchCheckKeys = append(w.batchCheckKeys, idxKey)
w.distinctCheckFlags = append(w.distinctCheckFlags, distinct)
}

batchVals, err := txn.BatchGet(context.Background(), w.batchCheckKeys)
Expand All @@ -1548,12 +1562,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
} else if w.distinctCheckFlags[i] {
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
Comment on lines 1563 to 1564
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move this comment to GenIndexValue

needRsData := tables.NeedRestoredData(w.index.Meta().Columns, w.table.Meta().Columns)
val, err := tablecodec.GenIndexValuePortal(stmtCtx, w.table.Meta(), w.index.Meta(), needRsData, w.distinctCheckFlags[i], false, idxRecords[i].vals, idxRecords[i].handle, 0, idxRecords[i].rsData)
if err != nil {
return errors.Trace(err)
}
batchVals[string(key)] = val
batchVals[string(key)] = w.batchCheckValues[i]
}
}
// Constrains is already checked.
Expand Down Expand Up @@ -1641,19 +1650,18 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
} else { // The lightning environment is ready.
vars := w.sessCtx.GetSessionVars()
sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs()
key, distinct, err := w.index.GenIndexKey(sCtx, idxRecord.vals, idxRecord.handle, writeBufs.IndexKeyBuf)
if err != nil {
return errors.Trace(err)
}
idxVal, err := w.index.GenIndexValue(sCtx, distinct, idxRecord.vals, idxRecord.handle, idxRecord.rsData)
if err != nil {
return errors.Trace(err)
}
err = w.writerCtx.WriteRow(key, idxVal)
if err != nil {
return errors.Trace(err)
iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData)
for iter.Valid() {
key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf)
if err != nil {
return errors.Trace(err)
}
err = w.writerCtx.WriteRow(key, idxVal)
if err != nil {
return errors.Trace(err)
}
writeBufs.IndexKeyBuf = key
}
writeBufs.IndexKeyBuf = key
}
taskCtx.addedCount++
}
Expand Down
71 changes: 71 additions & 0 deletions ddl/mv_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl_test

import (
"fmt"
"strings"
"testing"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
)

func TestMultiValuedIndexOnlineDDL(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 100; i++ {
sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3))
if i != 99 {
sb.WriteString(",")
}
}
tk.MustExec(sb.String())

internalTK := testkit.NewTestKit(t, store)
internalTK.MustExec("use test")

hook := &ddl.TestDDLCallback{Do: dom}
n := 100
hook.OnJobRunBeforeExported = func(job *model.Job) {
internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2))
internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-4))
internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-3))
n++
}
o := dom.DDL().GetHook()
dom.DDL().SetHook(hook)

tk.MustExec("alter table t add index idx((cast(a as signed array)))")
tk.MustExec("admin check table t")
dom.DDL().SetHook(o)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (pk int primary key, a json);")
tk.MustExec("insert into t values (1, '[1,2,3]');")
tk.MustExec("insert into t values (2, '[2,3,4]');")
tk.MustExec("insert into t values (3, '[3,4,5]');")
tk.MustExec("insert into t values (4, '[-4,5,6]');")
tk.MustGetErrCode("alter table t add unique index idx((cast(a as signed array)));", errno.ErrDupEntry)
tk.MustGetErrMsg("alter table t add index idx((cast(a as unsigned array)));", "[ddl:8202]Cannot decode index value, because [types:1690]constant -4 overflows bigint")
}
10 changes: 9 additions & 1 deletion table/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func WithCtx(ctx context.Context) CreateIdxOptFunc {
}
}

// IndexIter is index kvs iter.
type IndexIter interface {
Next(kb []byte) ([]byte, []byte, bool, error)
Valid() bool
}

// Index is the interface for index data on KV store.
type Index interface {
// Meta returns IndexInfo.
Expand All @@ -79,9 +85,11 @@ type Index interface {
Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error)
// Delete supports delete from statement.
Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error
// GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation.
GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexIter
// Exist supports check index exists or not.
Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) (bool, kv.Handle, error)
// GenIndexKey generates an index key.
// GenIndexKey generates an index key. If the index is a multi-valued index, use GenIndexKVIter instead.
GenIndexKey(sc *stmtctx.StatementContext, indexedValues []types.Datum, h kv.Handle, buf []byte) (key []byte, distinct bool, err error)
// GenIndexValue generates an index value.
GenIndexValue(sc *stmtctx.StatementContext, distinct bool, indexedValues []types.Datum, h kv.Handle, restoredData []types.Datum) ([]byte, error)
Expand Down
40 changes: 40 additions & 0 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,46 @@ func (c *index) Delete(sc *stmtctx.StatementContext, txn kv.Transaction, indexed
return nil
}

func (c *index) GenIndexKVIter(sc *stmtctx.StatementContext, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) table.IndexIter {
indexedValues := c.getIndexedValue(indexedValue)
return &indexGenerator{
c: c,
sctx: sc,
indexedVals: indexedValues,
h: h,
handleRestoreData: handleRestoreData,
i: 0,
}
}

type indexGenerator struct {
c *index
sctx *stmtctx.StatementContext
indexedVals [][]types.Datum
h kv.Handle
handleRestoreData []types.Datum

i int
}

func (s *indexGenerator) Next(kb []byte) ([]byte, []byte, bool, error) {
val := s.indexedVals[s.i]
key, distinct, err := s.c.GenIndexKey(s.sctx, val, s.h, kb)
if err != nil {
return nil, nil, false, err
}
idxVal, err := s.c.GenIndexValue(s.sctx, distinct, val, s.h, s.handleRestoreData)
if err != nil {
return nil, nil, false, err
}
s.i++
return key, idxVal, distinct, err
}

func (s *indexGenerator) Valid() bool {
return s.i < len(s.indexedVals)
}

const (
// TempIndexKeyTypeNone means the key is not a temporary index key.
TempIndexKeyTypeNone byte = 0
Expand Down
48 changes: 48 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,54 @@ func TestAddIndexIngestWriterCountOnPartitionTable(t *testing.T) {
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestIngestMVIndexOnPartitionTable(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;")
var sb strings.Builder
sb.WriteString("insert into t values ")
for i := 0; i < 10240; i++ {
sb.WriteString(fmt.Sprintf("(%d, '[%d, %d, %d]')", i, i+1, i+2, i+3))
if i != 10240-1 {
sb.WriteString(",")
}
}
tk.MustExec(sb.String())
tk.MustExec("alter table t add index idx((cast(a as signed array)));")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
tk.MustExec("admin check table t")

tk.MustExec("drop table t")
tk.MustExec("create table t (pk int primary key, a json) partition by hash(pk) partitions 32;")
tk.MustExec(sb.String())
var wg sync.WaitGroup
wg.Add(1)
go func() {
n := 10240
internalTK := testkit.NewTestKit(t, store)
internalTK.MustExec("use addindexlit;")

for i := 0; i < 1024; i++ {
internalTK.MustExec(fmt.Sprintf("insert into t values (%d, '[%d, %d, %d]')", n, n, n+1, n+2))
internalTK.MustExec(fmt.Sprintf("delete from t where pk = %d", n-10))
internalTK.MustExec(fmt.Sprintf("update t set a = '[%d, %d, %d]' where pk = %d", n-3, n-2, n+1000, n-5))
n++
}
wg.Done()
}()
tk.MustExec("alter table t add index idx((cast(a as signed array)));")
wg.Wait()
tk.MustExec("admin check table t")
}

func TestAddIndexIngestAdjustBackfillWorker(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
Expand Down