Skip to content

Commit

Permalink
ddl: support read generated columns with copr for adding index (#39345)
Browse files Browse the repository at this point in the history
ref #35983
  • Loading branch information
tangenta authored Nov 24, 2022
1 parent e205f93 commit 7611a03
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 46 deletions.
6 changes: 3 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,9 +744,9 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
copCtx := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if copCtx == nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender")
copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
Expand Down
180 changes: 138 additions & 42 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -221,34 +222,145 @@ type copContext struct {
colInfos []*model.ColumnInfo
fieldTps []*types.FieldType
sessCtx sessionctx.Context

expColInfos []*expression.Column
idxColOutputOffsets []int
handleOutputOffsets []int
virtualColOffsets []int
virtualColFieldTps []*types.FieldType
}

func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) *copContext {
func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) (*copContext, error) {
var err error
usedColumnIDs := make(map[int64]struct{}, len(idxInfo.Columns))
usedColumnIDs, err = fillUsedColumns(usedColumnIDs, idxInfo, tblInfo)
var handleIDs []int64
if err != nil {
return nil, err
}
var primaryIdx *model.IndexInfo
if tblInfo.PKIsHandle {
pkCol := tblInfo.GetPkColInfo()
usedColumnIDs[pkCol.ID] = struct{}{}
handleIDs = []int64{pkCol.ID}
} else if tblInfo.IsCommonHandle {
primaryIdx = tables.FindPrimaryIndex(tblInfo)
handleIDs = make([]int64, 0, len(primaryIdx.Columns))
for _, pkCol := range primaryIdx.Columns {
col := tblInfo.Columns[pkCol.Offset]
handleIDs = append(handleIDs, col.ID)
}
usedColumnIDs, err = fillUsedColumns(usedColumnIDs, primaryIdx, tblInfo)
if err != nil {
return nil, err
}
}

// Only collect the columns that are used by the index.
colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns))
fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
c := tblInfo.Columns[idxCol.Offset]
if c.IsGenerated() && !c.GeneratedStored {
// TODO(tangenta): support reading virtual generated columns.
return nil
for i := range tblInfo.Columns {
col := tblInfo.Columns[i]
if _, found := usedColumnIDs[col.ID]; found {
colInfos = append(colInfos, col)
fieldTps = append(fieldTps, &col.FieldType)
}
colInfos = append(colInfos, c)
fieldTps = append(fieldTps, &c.FieldType)
}

pkColInfos, pkFieldTps, pkInfo := buildHandleColInfoAndFieldTypes(tblInfo)
colInfos = append(colInfos, pkColInfos...)
fieldTps = append(fieldTps, pkFieldTps...)
// Append the extra handle column when _tidb_rowid is used.
if !tblInfo.HasClusteredIndex() {
extra := model.NewExtraHandleColInfo()
colInfos = append(colInfos, extra)
fieldTps = append(fieldTps, &extra.FieldType)
handleIDs = []int64{extra.ID}
}

expColInfos, _, err := expression.ColumnInfos2ColumnsAndNames(sessCtx,
model.CIStr{} /* unused */, tblInfo.Name, colInfos, tblInfo)
if err != nil {
return nil, err
}
idxOffsets := resolveIndicesForIndex(expColInfos, idxInfo, tblInfo)
hdColOffsets := resolveIndicesForHandle(expColInfos, handleIDs)
vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(expColInfos)

copCtx := &copContext{
tblInfo: tblInfo,
idxInfo: idxInfo,
pkInfo: pkInfo,
pkInfo: primaryIdx,
colInfos: colInfos,
fieldTps: fieldTps,
sessCtx: sessCtx,

expColInfos: expColInfos,
idxColOutputOffsets: idxOffsets,
handleOutputOffsets: hdColOffsets,
virtualColOffsets: vColOffsets,
virtualColFieldTps: vColFts,
}
return copCtx, nil
}

func fillUsedColumns(usedCols map[int64]struct{}, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) (map[int64]struct{}, error) {
colsToChecks := make([]*model.ColumnInfo, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
colsToChecks = append(colsToChecks, tblInfo.Columns[idxCol.Offset])
}
for len(colsToChecks) > 0 {
next := colsToChecks[0]
colsToChecks = colsToChecks[1:]
usedCols[next.ID] = struct{}{}
for depColName := range next.Dependences {
// Expand the virtual generated columns.
depCol := model.FindColumnInfo(tblInfo.Columns, depColName)
if depCol == nil {
return nil, errors.Trace(errors.Errorf("dependent column %s not found", depColName))
}
if _, ok := usedCols[depCol.ID]; !ok {
colsToChecks = append(colsToChecks, depCol)
}
}
}
return copCtx
return usedCols, nil
}

func resolveIndicesForIndex(outputCols []*expression.Column, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) []int {
offsets := make([]int, 0, len(idxInfo.Columns))
for _, idxCol := range idxInfo.Columns {
hid := tblInfo.Columns[idxCol.Offset].ID
for j, col := range outputCols {
if col.ID == hid {
offsets = append(offsets, j)
break
}
}
}
return offsets
}

func resolveIndicesForHandle(cols []*expression.Column, handleIDs []int64) []int {
offsets := make([]int, 0, len(handleIDs))
for _, hid := range handleIDs {
for j, col := range cols {
if col.ID == hid {
offsets = append(offsets, j)
break
}
}
}
return offsets
}

func collectVirtualColumnOffsetsAndTypes(cols []*expression.Column) ([]int, []*types.FieldType) {
var offsets []int
var fts []*types.FieldType
for i, col := range cols {
if col.VirtualExpr != nil {
offsets = append(offsets, i)
fts = append(fts, col.GetType())
}
}
return offsets, fts
}

func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start, end kv.Key) (distsql.SelectResult, error) {
Expand Down Expand Up @@ -284,8 +396,13 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
return buf, true, nil
}
iter := chunk.NewIterator4Chunk(chk)
err = table.FillVirtualColumnValue(c.virtualColFieldTps, c.virtualColOffsets, c.expColInfos, c.colInfos, c.sessCtx, chk)
if err != nil {
return nil, false, errors.Trace(err)
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
idxDt, hdDt := extractIdxValsAndHandle(row, c.idxInfo, c.fieldTps)
idxDt := extractDatumByOffsets(row, c.idxColOutputOffsets, c.expColInfos)
hdDt := extractDatumByOffsets(row, c.handleOutputOffsets, c.expColInfos)
handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx)
if err != nil {
return nil, false, errors.Trace(err)
Expand Down Expand Up @@ -321,34 +438,13 @@ func constructTableScanPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, col
return &tipb.Executor{Tp: tipb.ExecType_TypeTableScan, TblScan: tblScan}, err
}

func buildHandleColInfoAndFieldTypes(tbInfo *model.TableInfo) ([]*model.ColumnInfo, []*types.FieldType, *model.IndexInfo) {
if tbInfo.PKIsHandle {
for i := range tbInfo.Columns {
if mysql.HasPriKeyFlag(tbInfo.Columns[i].GetFlag()) {
return []*model.ColumnInfo{tbInfo.Columns[i]}, []*types.FieldType{&tbInfo.Columns[i].FieldType}, nil
}
}
} else if tbInfo.IsCommonHandle {
primaryIdx := tables.FindPrimaryIndex(tbInfo)
pkCols := make([]*model.ColumnInfo, 0, len(primaryIdx.Columns))
pkFts := make([]*types.FieldType, 0, len(primaryIdx.Columns))
for _, pkCol := range primaryIdx.Columns {
pkCols = append(pkCols, tbInfo.Columns[pkCol.Offset])
pkFts = append(pkFts, &tbInfo.Columns[pkCol.Offset].FieldType)
}
return pkCols, pkFts, primaryIdx
}
extra := model.NewExtraHandleColInfo()
return []*model.ColumnInfo{extra}, []*types.FieldType{&extra.FieldType}, nil
}

func extractIdxValsAndHandle(row chunk.Row, idxInfo *model.IndexInfo, fieldTps []*types.FieldType) ([]types.Datum, []types.Datum) {
datumBuf := make([]types.Datum, 0, len(fieldTps))
idxColLen := len(idxInfo.Columns)
for i, ft := range fieldTps {
datumBuf = append(datumBuf, row.GetDatum(i, ft))
func extractDatumByOffsets(row chunk.Row, offsets []int, expCols []*expression.Column) []types.Datum {
datumBuf := make([]types.Datum, 0, len(offsets))
for _, offset := range offsets {
c := expCols[offset]
datumBuf = append(datumBuf, row.GetDatum(offset, c.GetType()))
}
return datumBuf[:idxColLen], datumBuf[idxColLen:]
return datumBuf
}

func buildHandle(pkDts []types.Datum, tblInfo *model.TableInfo,
Expand Down
3 changes: 2 additions & 1 deletion ddl/index_cop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) {
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.FindIndexByName(idx)
copCtx := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session())
copCtx, err := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session())
require.NoError(t, err)
startKey := tbl.RecordPrefix()
endKey := startKey.PrefixNext()
txn, err := store.Begin()
Expand Down
7 changes: 7 additions & 0 deletions ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ingest

import (
"path/filepath"
"sync/atomic"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand All @@ -26,12 +27,18 @@ import (
"go.uber.org/zap"
)

// ImporterRangeConcurrencyForTest is only used for test.
var ImporterRangeConcurrencyForTest *atomic.Int32

func generateLightningConfig(memRoot MemRoot, jobID int64, unique bool) (*config.Config, error) {
tidbCfg := tidbconf.GetGlobalConfig()
cfg := config.NewConfig()
cfg.TikvImporter.Backend = config.BackendLocal
// Each backend will build a single dir in lightning dir.
cfg.TikvImporter.SortedKVDir = filepath.Join(LitSortPath, encodeBackendTag(jobID))
if ImporterRangeConcurrencyForTest != nil {
cfg.TikvImporter.RangeConcurrency = int(ImporterRangeConcurrencyForTest.Load())
}
_, err := cfg.AdjustCommon()
if err != nil {
logutil.BgLogger().Warn(LitWarnConfigError, zap.Error(err))
Expand Down
50 changes: 50 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
ingest.ImporterRangeConcurrencyForTest = &atomic.Int32{}
ingest.ImporterRangeConcurrencyForTest.Store(2)
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 20;")
tk.MustExec("create table t (a int primary key);")
var sb strings.Builder
Expand All @@ -205,4 +207,52 @@ func TestAddIndexIngestAdjustBackfillWorkerCountFail(t *testing.T) {
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
tk.MustExec("set @@global.tidb_ddl_reorg_worker_cnt = 4;")
ingest.ImporterRangeConcurrencyForTest = nil
}

func TestAddIndexIngestGeneratedColumns(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
assertLastNDDLUseIngest := func(n int) {
tk.MustExec("admin check table t;")
rows := tk.MustQuery(fmt.Sprintf("admin show ddl jobs %d;", n)).Rows()
require.Len(t, rows, n)
for i := 0; i < n; i++ {
jobTp := rows[i][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}
}
tk.MustExec("create table t (a int, b int, c int as (b+10), d int as (b+c), primary key (a) clustered);")
tk.MustExec("insert into t (a, b) values (1, 1), (2, 2), (3, 3);")
tk.MustExec("alter table t add index idx(c);")
tk.MustExec("alter table t add index idx1(c, a);")
tk.MustExec("alter table t add index idx2(a);")
tk.MustExec("alter table t add index idx3(d);")
tk.MustExec("alter table t add index idx4(d, c);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 11 12", "2 2 12 14", "3 3 13 16"))
assertLastNDDLUseIngest(5)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b char(10), c char(10) as (concat(b, 'x')), d int, e char(20) as (c));")
tk.MustExec("insert into t (a, b, d) values (1, '1', 1), (2, '2', 2), (3, '3', 3);")
tk.MustExec("alter table t add index idx(c);")
tk.MustExec("alter table t add index idx1(a, c);")
tk.MustExec("alter table t add index idx2(c(7));")
tk.MustExec("alter table t add index idx3(e(5));")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1x 1 1x", "2 2 2x 2 2x", "3 3 3x 3 3x"))
assertLastNDDLUseIngest(4)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b char(10), c tinyint, d int as (a + c), e bigint as (d - a), primary key(b, a) clustered);")
tk.MustExec("insert into t (a, b, c) values (1, '1', 1), (2, '2', 2), (3, '3', 3);")
tk.MustExec("alter table t add index idx(d);")
tk.MustExec("alter table t add index idx1(b(2), d);")
tk.MustExec("alter table t add index idx2(d, c);")
tk.MustExec("alter table t add index idx3(e);")
tk.MustQuery("select * from t;").Check(testkit.Rows("1 1 1 2 1", "2 2 2 4 2", "3 3 3 6 3"))
assertLastNDDLUseIngest(4)
}

0 comments on commit 7611a03

Please sign in to comment.