Skip to content

Commit

Permalink
admin: fix recover expression cause index broken (#41092)
Browse files Browse the repository at this point in the history
ref #40430, close #41087
  • Loading branch information
xiongjiwei authored Feb 7, 2023
1 parent f1d450f commit 57ab10c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 13 deletions.
43 changes: 42 additions & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -191,6 +192,9 @@ type RecoverIndexExec struct {
srcChunk *chunk.Chunk
handleCols plannercore.HandleCols

containsGenedCol bool
cols []*expression.Column

// below buf is used to reduce allocations.
recoverRows []recoverRows
idxValsBufs [][]types.Datum
Expand Down Expand Up @@ -379,7 +383,10 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
if err != nil {
return nil, err
}
idxVals := extractIdxVals(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes, idxValLen)
idxVals, err := e.buildIndexedValues(row, e.idxValsBufs[result.scanRowCount], e.colFieldTypes, idxValLen)
if err != nil {
return nil, err
}
e.idxValsBufs[result.scanRowCount] = idxVals
rsData := tables.TryGetHandleRestoredDataWrapper(e.table.Meta(), plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta())
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: false})
Expand All @@ -391,6 +398,40 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
return e.recoverRows, nil
}

func (e *RecoverIndexExec) buildIndexedValues(row chunk.Row, idxVals []types.Datum, fieldTypes []*types.FieldType, idxValLen int) ([]types.Datum, error) {
if !e.containsGenedCol {
return extractIdxVals(row, idxVals, fieldTypes, idxValLen), nil
}

if e.cols == nil {
columns, _, err := expression.ColumnInfos2ColumnsAndNames(e.ctx, model.NewCIStr("mock"), e.table.Meta().Name, e.table.Meta().Columns, e.table.Meta())
if err != nil {
return nil, err
}
e.cols = columns
}

if cap(idxVals) < idxValLen {
idxVals = make([]types.Datum, idxValLen)
} else {
idxVals = idxVals[:idxValLen]
}

for i, col := range e.index.Meta().Columns {
if e.table.Meta().Columns[col.Offset].IsGenerated() {
val, err := e.cols[col.Offset].EvalVirtualColumn(row)
if err != nil {
return nil, err
}
val.Copy(&idxVals[i])
} else {
val := row.GetDatum(col.Offset, &(e.table.Meta().Columns[col.Offset].FieldType))
val.Copy(&idxVals[i])
}
}
return idxVals, nil
}

func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows) error {
if len(rows) == 0 {
return nil
Expand Down
37 changes: 37 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,43 @@ func TestAdminRecoverIndex(t *testing.T) {

tk.MustExec("admin check index admin_test c2")
tk.MustExec("admin check table admin_test")

tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, primary key(c1), unique key i1((c2+1)))")
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (3, 3), (10, 10), (20, 20)")
r = tk.MustQuery("admin recover index admin_test i1")
r.Check(testkit.Rows("0 5"))
tk.MustExec("admin check table admin_test")
ctx = mock.NewContext()
ctx.Store = store
is = domain.InfoSchema()
dbName = model.NewCIStr("test")
tblName = model.NewCIStr("admin_test")
tbl, err = is.TableByName(dbName, tblName)
require.NoError(t, err)

tblInfo = tbl.Meta()
idxInfo = tblInfo.FindIndexByName("i1")
indexOpr = tables.NewIndex(tblInfo.ID, tblInfo, idxInfo)
sc = ctx.GetSessionVars().StmtCtx
txn, err = store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(sc, txn, types.MakeDatums(2), kv.IntHandle(1))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
r = tk.MustQuery("SELECT COUNT(*) FROM admin_test USE INDEX(i1)")
r.Check(testkit.Rows("4"))
err = tk.ExecToErr("admin check table admin_test")
require.Error(t, err)
r = tk.MustQuery("admin recover index admin_test i1")
r.Check(testkit.Rows("1 5"))
tk.MustExec("admin check table admin_test")

tk.MustExec("drop table if exists admin_test")
tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, primary key(c1), unique key i1(c1, c2));")
tk.MustExec("insert admin_test (c1, c2) values (1, 1), (2, 2), (3, 3), (10, 10), (20, 20);")
tk.MustExec("admin recover index admin_test i1;")
}

func TestAdminCleanupMVIndex(t *testing.T) {
Expand Down
43 changes: 31 additions & 12 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,24 +481,35 @@ func (b *executorBuilder) buildCheckTable(v *plannercore.CheckTable) Executor {
return e
}

func buildIdxColsConcatHandleCols(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) []*model.ColumnInfo {
handleLen := 1
func buildIdxColsConcatHandleCols(tblInfo *model.TableInfo, indexInfo *model.IndexInfo, hasGenedCol bool) []*model.ColumnInfo {
var pkCols []*model.IndexColumn
if tblInfo.IsCommonHandle {
pkIdx := tables.FindPrimaryIndex(tblInfo)
pkCols = pkIdx.Columns
handleLen = len(pkIdx.Columns)
}
columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+handleLen)
for _, idxCol := range indexInfo.Columns {
columns = append(columns, tblInfo.Columns[idxCol.Offset])

columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+len(pkCols))
if hasGenedCol {
columns = tblInfo.Columns
} else {
for _, idxCol := range indexInfo.Columns {
if tblInfo.PKIsHandle && tblInfo.GetPkColInfo().Offset == idxCol.Offset {
continue
}
columns = append(columns, tblInfo.Columns[idxCol.Offset])
}
}

if tblInfo.IsCommonHandle {
for _, c := range pkCols {
columns = append(columns, tblInfo.Columns[c.Offset])
}
return columns
}
if tblInfo.PKIsHandle {
columns = append(columns, tblInfo.Columns[tblInfo.GetPkColInfo().Offset])
return columns
}
handleOffset := len(columns)
handleColsInfo := &model.ColumnInfo{
ID: model.ExtraHandleID,
Expand All @@ -523,12 +534,20 @@ func (b *executorBuilder) buildRecoverIndex(v *plannercore.RecoverIndex) Executo
b.err = errors.Errorf("secondary index `%v` is not found in table `%v`", v.IndexName, v.Table.Name.O)
return nil
}
var hasGenedCol bool
for _, iCol := range index.Meta().Columns {
if tblInfo.Columns[iCol.Offset].IsGenerated() {
hasGenedCol = true
}
}
cols := buildIdxColsConcatHandleCols(tblInfo, index.Meta(), hasGenedCol)
e := &RecoverIndexExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
columns: buildIdxColsConcatHandleCols(tblInfo, index.Meta()),
index: index,
table: t,
physicalID: t.Meta().ID,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
columns: cols,
containsGenedCol: hasGenedCol,
index: index,
table: t,
physicalID: t.Meta().ID,
}
sessCtx := e.ctx.GetSessionVars().StmtCtx
e.handleCols = buildHandleColsForExec(sessCtx, tblInfo, index.Meta(), e.columns)
Expand Down Expand Up @@ -585,7 +604,7 @@ func (b *executorBuilder) buildCleanupIndex(v *plannercore.CleanupIndex) Executo
}
e := &CleanupIndexExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
columns: buildIdxColsConcatHandleCols(tblInfo, index.Meta()),
columns: buildIdxColsConcatHandleCols(tblInfo, index.Meta(), false),
index: index,
table: t,
physicalID: t.Meta().ID,
Expand Down

0 comments on commit 57ab10c

Please sign in to comment.