Skip to content

Commit

Permalink
*: support the vector index (#56409)
Browse files Browse the repository at this point in the history
close #55693
  • Loading branch information
zimulala authored Sep 30, 2024
1 parent bb9f4d1 commit 6e8f27f
Show file tree
Hide file tree
Showing 49 changed files with 13,009 additions and 11,489 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,11 @@ error = '''
Auto analyze is not effective for index '%-.192s', need analyze manually
'''

["ddl:9014"]
error = '''
TiFlash backfill index failed: %s
'''

["domain:8027"]
error = '''
Information schema is out of date: schema failed to update in 1 lease, please make sure TiDB can connect to TiKV
Expand Down
3 changes: 3 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ go_library(
"//pkg/util/execdetails",
"//pkg/util/filter",
"//pkg/util/gcutil",
"//pkg/util/generatedexpr",
"//pkg/util/generic",
"//pkg/util/hack",
"//pkg/util/intest",
Expand Down Expand Up @@ -322,6 +323,7 @@ go_test(
"//pkg/store/gcworker",
"//pkg/store/helper",
"//pkg/store/mockstore",
"//pkg/store/mockstore/unistore",
"//pkg/table",
"//pkg/table/tables",
"//pkg/tablecodec",
Expand Down Expand Up @@ -349,6 +351,7 @@ go_test(
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
26 changes: 23 additions & 3 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -68,6 +69,15 @@ var allTestCase = []testCancelJob{
{"alter table t add index idx_c2(c2)", true, model.StateDeleteOnly, true, true, nil},
{"alter table t add index idx_c2(c2)", true, model.StateWriteOnly, true, true, nil},
{"alter table t add index idx_cx2(c2)", false, model.StatePublic, false, true, nil},
// Drop vector index
{"alter table t drop index v_idx_1", true, model.StatePublic, true, false, []string{"alter table t add vector index v_idx_1((VEC_L2_DISTANCE(v2))) USING HNSW"}},
{"alter table t drop index v_idx_2", false, model.StateWriteOnly, true, false, []string{"alter table t add vector index v_idx_2((VEC_COSINE_DISTANCE(v2))) USING HNSW"}},
{"alter table t drop index v_idx_3", false, model.StateDeleteOnly, false, true, []string{"alter table t add vector index v_idx_3((VEC_COSINE_DISTANCE(v2))) USING HNSW"}},
{"alter table t drop index v_idx_4", false, model.StateDeleteReorganization, false, true, []string{"alter table t add vector index v_idx_4((VEC_COSINE_DISTANCE(v2))) USING HNSW"}},
// Add vector key
{"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateNone, true, false, nil},
{"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateDeleteOnly, true, true, nil},
{"alter table t add vector index v_idx((VEC_COSINE_DISTANCE(v2))) USING HNSW", true, model.StateWriteOnly, true, true, nil},
// Add column.
{"alter table t add column c4 bigint", true, model.StateNone, true, false, nil},
{"alter table t add column c4 bigint", true, model.StateDeleteOnly, true, true, nil},
Expand Down Expand Up @@ -204,7 +214,7 @@ func cancelSuccess(rs *testkit.Result) bool {
return strings.Contains(rs.Rows()[0][1].(string), "success")
}

func TestCancel(t *testing.T) {
func TestCancelVariousJobs(t *testing.T) {
var enterCnt, exitCnt atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) { exitCnt.Add(1) })
Expand All @@ -213,10 +223,18 @@ func TestCancel(t *testing.T) {
return enterCnt.Load() == exitCnt.Load()
}, 10*time.Second, 10*time.Millisecond)
}
store := testkit.CreateMockStoreWithSchemaLease(t, 100*time.Millisecond)
store := testkit.CreateMockStoreWithSchemaLease(t, 100*time.Millisecond, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tkCancel := testkit.NewTestKit(t, store)

tiflash := infosync.NewMockTiFlash()
infosync.SetMockTiFlash(tiflash)
defer func() {
tiflash.Lock()
tiflash.StatusServer.Close()
tiflash.Unlock()
}()

// Prepare schema.
tk.MustExec("use test")
tk.MustExec("drop table if exists t_partition;")
Expand All @@ -231,14 +249,16 @@ func TestCancel(t *testing.T) {
partition p4 values less than (7096)
);`)
tk.MustExec(`create table t (
c1 int, c2 int, c3 int, c11 tinyint, index fk_c1(c1)
c1 int, c2 int, c3 int, c11 tinyint, v2 vector(3), index fk_c1(c1)
);`)
tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';")

// Prepare data.
for i := 0; i <= 2048; i++ {
tk.MustExec(fmt.Sprintf("insert into t_partition values(%d, %d, %d)", i*3, i*2, i))
tk.MustExec(fmt.Sprintf("insert into t(c1, c2, c3) values(%d, %d, %d)", i*3, i*2, i))
}
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(2048)`)

// Change some configurations.
ddl.ReorgWaitTimeout = 10 * time.Millisecond
Expand Down
8 changes: 6 additions & 2 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,10 +1102,14 @@ func isColumnWithIndex(colName string, indices []*model.IndexInfo) bool {

func isColumnCanDropWithIndex(colName string, indices []*model.IndexInfo) error {
for _, indexInfo := range indices {
if indexInfo.Primary || len(indexInfo.Columns) > 1 {
if indexInfo.Primary || len(indexInfo.Columns) > 1 || indexInfo.VectorInfo != nil {
for _, col := range indexInfo.Columns {
if col.Name.L == colName {
return dbterror.ErrCantDropColWithIndex.GenWithStack("can't drop column %s with composite index covered or Primary Key covered now", colName)
errMsg := "with composite index covered or Primary Key covered now"
if indexInfo.VectorInfo != nil {
errMsg = "with Vector Key covered now"
}
return dbterror.ErrCantDropColWithIndex.GenWithStack("can't drop column %s "+errMsg, colName)
}
}
}
Expand Down
80 changes: 65 additions & 15 deletions pkg/ddl/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/infoschema"
infoschemactx "github.com/pingcap/tidb/pkg/infoschema/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/metabuild"
Expand Down Expand Up @@ -392,12 +393,13 @@ func findTableIDFromStore(t *meta.Mutator, schemaID int64, tableName string) (in
// BuildTableInfoFromAST builds model.TableInfo from a SQL statement.
// Note: TableID and PartitionID are left as uninitialized value.
func BuildTableInfoFromAST(ctx *metabuild.Context, s *ast.CreateTableStmt) (*model.TableInfo, error) {
return buildTableInfoWithCheck(ctx, s, mysql.DefaultCharset, "", nil)
// TODO: Support the vector index for this function.
return buildTableInfoWithCheck(ctx, nil, s, mysql.DefaultCharset, "", nil)
}

// buildTableInfoWithCheck builds model.TableInfo from a SQL statement.
// Note: TableID and PartitionIDs are left as uninitialized value.
func buildTableInfoWithCheck(ctx *metabuild.Context, s *ast.CreateTableStmt, dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
func buildTableInfoWithCheck(ctx *metabuild.Context, store kv.Storage, s *ast.CreateTableStmt, dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
tbInfo, err := BuildTableInfoWithStmt(ctx, s, dbCharset, dbCollate, placementPolicyRef)
if err != nil {
return nil, err
Expand All @@ -408,7 +410,7 @@ func buildTableInfoWithCheck(ctx *metabuild.Context, s *ast.CreateTableStmt, dbC
if err = checkTableInfoValidWithStmt(ctx, tbInfo, s); err != nil {
return nil, err
}
if err = checkTableInfoValidExtra(ctx.GetExprCtx().GetEvalCtx().ErrCtx(), tbInfo); err != nil {
if err = checkTableInfoValidExtra(ctx.GetExprCtx().GetEvalCtx().ErrCtx(), store, tbInfo); err != nil {
return nil, err
}
return tbInfo, nil
Expand Down Expand Up @@ -509,12 +511,47 @@ func checkGeneratedColumn(ctx *metabuild.Context, schemaName pmodel.CIStr, table
return nil
}

func checkVectorIndexIfNeedTiFlashReplica(store kv.Storage, tblInfo *model.TableInfo) error {
var hasVectorIndex bool
for _, idx := range tblInfo.Indices {
if idx.VectorInfo != nil {
hasVectorIndex = true
break
}
}
if !hasVectorIndex {
return nil
}
if store == nil {
return errors.New("the store is nil")
}

if tblInfo.TiFlashReplica == nil || tblInfo.TiFlashReplica.Count == 0 {
replicas, err := infoschema.GetTiFlashStoreCount(store)
if err != nil {
return errors.Trace(err)
}
if replicas == 0 {
return errors.Trace(dbterror.ErrUnsupportedAddVectorIndex.FastGenByArgs("unsupported TiFlash store count is 0"))
}

// Always try to set to 1 as the default replica count.
defaultReplicas := uint64(1)
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: defaultReplicas,
LocationLabels: make([]string, 0),
}
}

return errors.Trace(checkTableTypeForVectorIndex(tblInfo))
}

// checkTableInfoValidExtra is like checkTableInfoValid, but also assumes the
// table info comes from untrusted source and performs further checks such as
// name length and column count.
// (checkTableInfoValid is also used in repairing objects which don't perform
// these checks. Perhaps the two functions should be merged together regardless?)
func checkTableInfoValidExtra(ec errctx.Context, tbInfo *model.TableInfo) error {
func checkTableInfoValidExtra(ec errctx.Context, store kv.Storage, tbInfo *model.TableInfo) error {
if err := checkTooLongTable(tbInfo.Name); err != nil {
return err
}
Expand All @@ -537,6 +574,9 @@ func checkTableInfoValidExtra(ec errctx.Context, tbInfo *model.TableInfo) error
if err := checkGlobalIndexes(ec, tbInfo); err != nil {
return errors.Trace(err)
}
if err := checkVectorIndexIfNeedTiFlashReplica(store, tbInfo); err != nil {
return errors.Trace(err)
}

// FIXME: perform checkConstraintNames
if err := checkCharsetAndCollation(tbInfo.Charset, tbInfo.Collate); err != nil {
Expand Down Expand Up @@ -620,7 +660,8 @@ func checkColumnAttributes(colName string, tp *types.FieldType) error {
}

// BuildSessionTemporaryTableInfo builds model.TableInfo from a SQL statement.
func BuildSessionTemporaryTableInfo(ctx *metabuild.Context, is infoschema.InfoSchema, s *ast.CreateTableStmt, dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
func BuildSessionTemporaryTableInfo(ctx *metabuild.Context, store kv.Storage, is infoschema.InfoSchema, s *ast.CreateTableStmt,
dbCharset, dbCollate string, placementPolicyRef *model.PolicyRefInfo) (*model.TableInfo, error) {
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
//build tableInfo
var tbInfo *model.TableInfo
Expand All @@ -638,7 +679,7 @@ func BuildSessionTemporaryTableInfo(ctx *metabuild.Context, is infoschema.InfoSc
}
tbInfo, err = BuildTableInfoWithLike(ident, referTbl.Meta(), s)
} else {
tbInfo, err = buildTableInfoWithCheck(ctx, s, dbCharset, dbCollate, placementPolicyRef)
tbInfo, err = buildTableInfoWithCheck(ctx, store, s, dbCharset, dbCollate, placementPolicyRef)
}
return tbInfo, err
}
Expand Down Expand Up @@ -1167,10 +1208,13 @@ func BuildTableInfo(
}
foreignKeyID := tbInfo.MaxForeignKeyID
for _, constr := range constraints {
// Build hidden columns if necessary.
hiddenCols, err := buildHiddenColumnInfoWithCheck(ctx, constr.Keys, pmodel.NewCIStr(constr.Name), tbInfo, tblColumns)
if err != nil {
return nil, err
var hiddenCols []*model.ColumnInfo
if constr.Tp != ast.ConstraintVector {
// Build hidden columns if necessary.
hiddenCols, err = buildHiddenColumnInfoWithCheck(ctx, constr.Keys, pmodel.NewCIStr(constr.Name), tbInfo, tblColumns)
if err != nil {
return nil, err
}
}
for _, hiddenCol := range hiddenCols {
hiddenCol.State = model.StatePublic
Expand Down Expand Up @@ -1235,18 +1279,23 @@ func BuildTableInfo(
}

var (
indexName = constr.Name
primary, unique bool
indexName = constr.Name
primary, unique, vector bool
)

// Check if the index is primary or unique.
// Check if the index is primary, unique or vector.
switch constr.Tp {
case ast.ConstraintPrimaryKey:
primary = true
unique = true
indexName = mysql.PrimaryKeyName
case ast.ConstraintUniq, ast.ConstraintUniqKey, ast.ConstraintUniqIndex:
unique = true
case ast.ConstraintVector:
if constr.Option.Visibility == ast.IndexVisibilityInvisible {
return nil, dbterror.ErrGeneralUnsupportedDDL.GenWithStackByArgs("set vector index invisible")
}
vector = true
}

// check constraint
Expand Down Expand Up @@ -1315,10 +1364,11 @@ func BuildTableInfo(
// build index info.
idxInfo, err := BuildIndexInfo(
ctx,
tbInfo.Columns,
tbInfo,
pmodel.NewCIStr(indexName),
primary,
unique,
vector,
constr.Keys,
constr.Option,
model.StatePublic,
Expand Down Expand Up @@ -1483,7 +1533,7 @@ func addIndexForForeignKey(ctx *metabuild.Context, tbInfo *model.TableInfo) erro
Length: types.UnspecifiedLength,
})
}
idxInfo, err := BuildIndexInfo(ctx, tbInfo.Columns, idxName, false, false, keys, nil, model.StatePublic)
idxInfo, err := BuildIndexInfo(ctx, tbInfo, idxName, false, false, false, keys, nil, model.StatePublic)
if err != nil {
return errors.Trace(err)
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
Expand Down Expand Up @@ -1157,6 +1158,35 @@ func TestParallelAlterAddIndex(t *testing.T) {
testControlParallelExecSQL(t, tk, store, dom, "", sql1, sql2, f)
}

func TestParallelAlterAddVectorIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, tiflashReplicaLease, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database test_db_state default charset utf8 default collate utf8_bin")
tk.MustExec("use test_db_state")
tk.MustExec("create table tt (a int, b vector, c vector(3), d vector(4));")
tk.MustExec("alter table tt set tiflash replica 2 location labels 'a','b';")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/MockCheckVectorIndexProcess", `return(1)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/"+
"ddl/MockCheckVectorIndexProcess"))
}()
tiflash := infosync.NewMockTiFlash()
infosync.SetMockTiFlash(tiflash)
defer func() {
tiflash.Lock()
tiflash.StatusServer.Close()
tiflash.Unlock()
}()
sql1 := "alter table tt add vector index vecIdx((vec_cosine_distance(c))) USING HNSW;"
sql2 := "alter table tt add vector index vecIdx1((vec_cosine_distance(c))) USING HNSW;"
f := func(err1, err2 error) {
require.NoError(t, err1)
require.EqualError(t, err2,
"[ddl:1061]DDL job rollback, error msg: vector index vecIdx function vec_cosine_distance already exist on column c")
}
testControlParallelExecSQL(t, tk, store, dom, "", sql1, sql2, f)
}

func TestParallelAlterAddExpressionIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
6 changes: 4 additions & 2 deletions pkg/ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1982,11 +1982,13 @@ func TestDropColumnWithCompositeIndex(t *testing.T) {
defer tk.MustExec("drop table if exists t_drop_column_with_comp_idx")
tk.MustExec("create index idx_bc on t_drop_column_with_comp_idx(b, c)")
tk.MustExec("create index idx_b on t_drop_column_with_comp_idx(b)")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b", "[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b",
"[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustQuery(query).Check(testkit.Rows("idx_b YES", "idx_bc YES"))
tk.MustExec("alter table t_drop_column_with_comp_idx alter index idx_bc invisible")
tk.MustExec("alter table t_drop_column_with_comp_idx alter index idx_b invisible")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b", "[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustGetErrMsg("alter table t_drop_column_with_comp_idx drop column b",
"[ddl:8200]can't drop column b with composite index covered or Primary Key covered now")
tk.MustQuery(query).Check(testkit.Rows("idx_b NO", "idx_bc NO"))
}

Expand Down
11 changes: 3 additions & 8 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,9 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
var indexName any
var partitionIDs []int64
ifExists := make([]bool, 1)
allIndexIDs := make([]int64, 1)
if err := job.DecodeArgs(&indexName, &ifExists[0], &allIndexIDs[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&indexName, &ifExists, &allIndexIDs, &partitionIDs); err != nil {
return errors.Trace(err)
}
_, _, allIndexIDs, partitionIDs, _, err := job.DecodeDropIndexFinishedArgs()
if err != nil {
return errors.Trace(err)
}
// partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table.
if len(partitionIDs) == 0 {
Expand Down
Loading

0 comments on commit 6e8f27f

Please sign in to comment.