Skip to content

Commit

Permalink
partition: check global index when execute create table (#41437)
Browse files Browse the repository at this point in the history
close #40145, close #40148, close #41447
  • Loading branch information
Defined2014 authored Feb 20, 2023
1 parent 301a024 commit 21aa40f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 22 deletions.
57 changes: 51 additions & 6 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1866,13 +1866,61 @@ func TestMultiPartitionDropAndTruncate(t *testing.T) {
result.Check(testkit.Rows(`2010`))
}

func TestDropPartitionWithGlobalIndex(t *testing.T) {
restore := config.RestoreFunc()
defer restore()
func TestCreatePartitionTableWithGlobalIndex(t *testing.T) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.EnableGlobalIndex = true
})

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test_global")
tk.MustExec(`create table test_global ( a int, b int, c int, unique key p_b(b))
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20)
);`)

tk.MustExec("insert into test_global values (1,2,2)")
tk.MustGetErrCode("insert into test_global values (11,2,2)", errno.ErrDupEntry)
tk.MustGetErrMsg("insert into test_global values (11,2,2)", "[kv:1062]Duplicate entry '2' for key 'test_global.p_b'")

// NULL will not get 'duplicate key' error here
tk.MustExec("insert into test_global(a,c) values (1,2)")
tk.MustExec("insert into test_global(a,c) values (11,2)")

tk.MustExec("drop table if exists test_global")
tk.MustGetErrMsg(`create table test_global ( a int, b int, c int, primary key p_b(b) /*T![clustered_index] CLUSTERED */)
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20)
);`, "[ddl:1503]A CLUSTERED INDEX must include all columns in the table's partitioning function")

tk.MustExec("drop table if exists test_global")
tk.MustGetErrMsg(`create table test_global ( a int, b int, c int, primary key p_b_c(b, c) /*T![clustered_index] CLUSTERED */)
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20)
);`, "[ddl:1503]A CLUSTERED INDEX must include all columns in the table's partitioning function")

tk.MustExec("drop table if exists test_global")
tk.MustExec(`create table test_global ( a int, b int, c int, primary key (b) /*T![clustered_index] NONCLUSTERED */)
partition by range( a ) (
partition p1 values less than (10),
partition p2 values less than (20)
);`)
tk.MustExec("insert into test_global values (1,2,2)")
tk.MustGetErrCode("insert into test_global values (11,2,2)", errno.ErrDupEntry)
tk.MustGetErrMsg("insert into test_global values (11,2,2)", "[kv:1062]Duplicate entry '2' for key 'test_global.PRIMARY'")
}

func TestDropPartitionWithGlobalIndex(t *testing.T) {
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.EnableGlobalIndex = true
})
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists test_global")
Expand Down Expand Up @@ -1902,9 +1950,6 @@ func TestDropPartitionWithGlobalIndex(t *testing.T) {
require.NotNil(t, idxInfo)
cnt = checkGlobalIndexCleanUpDone(t, tk.Session(), tt.Meta(), idxInfo, pid)
require.Equal(t, 2, cnt)
config.UpdateGlobal(func(conf *config.Config) {
conf.EnableGlobalIndex = false
})
}

func TestAlterTableExchangePartition(t *testing.T) {
Expand Down
55 changes: 39 additions & 16 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,37 @@ func buildTablePartitionInfo(ctx sessionctx.Context, s *ast.PartitionOptions, tb
ctx.SetValue(sessionctx.QueryString, newQuery)
}
}

partCols, err := getPartitionColSlices(ctx, tbInfo, s)
if err != nil {
return errors.Trace(err)
}

for _, index := range tbInfo.Indices {
if index.Unique && !checkUniqueKeyIncludePartKey(partCols, index.Columns) {
index.Global = config.GetGlobalConfig().EnableGlobalIndex
}
}
return nil
}

func getPartitionColSlices(sctx sessionctx.Context, tblInfo *model.TableInfo, s *ast.PartitionOptions) (partCols stringSlice, err error) {
if s.Expr != nil {
extractCols := newPartitionExprChecker(sctx, tblInfo)
s.Expr.Accept(extractCols)
partColumns, err := extractCols.columns, extractCols.err
if err != nil {
return nil, err
}
partCols = columnInfoSlice(partColumns)
} else if len(s.ColumnNames) > 0 {
partCols = columnNameSlice(s.ColumnNames)
} else {
return nil, errors.Errorf("Table partition metadata not correct, neither partition expression or list of partition columns")
}
return partCols, nil
}

// getPartitionIntervalFromTable checks if a partitioned table matches a generated INTERVAL partitioned scheme
// will return nil if error occurs, i.e. not an INTERVAL partitioned table
func getPartitionIntervalFromTable(ctx sessionctx.Context, tbInfo *model.TableInfo) *ast.PartitionInterval {
Expand Down Expand Up @@ -3110,20 +3138,9 @@ func checkPartitioningKeysConstraints(sctx sessionctx.Context, s *ast.CreateTabl
return nil
}

var partCols stringSlice
if s.Partition.Expr != nil {
extractCols := newPartitionExprChecker(sctx, tblInfo)
s.Partition.Expr.Accept(extractCols)
partColumns, err := extractCols.columns, extractCols.err
if err != nil {
return err
}
partCols = columnInfoSlice(partColumns)
} else if len(s.Partition.ColumnNames) > 0 {
partCols = columnNameSlice(s.Partition.ColumnNames)
} else {
// TODO: Check keys constraints for list, key partition type and so on.
return nil
partCols, err := getPartitionColSlices(sctx, tblInfo, s.Partition)
if err != nil {
return errors.Trace(err)
}

// Checks that the partitioning key is included in the constraint.
Expand All @@ -3132,7 +3149,13 @@ func checkPartitioningKeysConstraints(sctx sessionctx.Context, s *ast.CreateTabl
for _, index := range tblInfo.Indices {
if index.Unique && !checkUniqueKeyIncludePartKey(partCols, index.Columns) {
if index.Primary {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("PRIMARY KEY")
// not support global index with clustered index
if tblInfo.IsCommonHandle {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("CLUSTERED INDEX")
}
if !config.GetGlobalConfig().EnableGlobalIndex {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("PRIMARY KEY")
}
}
if !config.GetGlobalConfig().EnableGlobalIndex {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("UNIQUE INDEX")
Expand All @@ -3146,7 +3169,7 @@ func checkPartitioningKeysConstraints(sctx sessionctx.Context, s *ast.CreateTabl
Length: types.UnspecifiedLength,
}}
if !checkUniqueKeyIncludePartKey(partCols, indexCols) {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("PRIMARY KEY")
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("CLUSTERED INDEX")
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (txn *tikvTxn) SetDiskFullOpt(level kvrpcpb.DiskFullOpt) {

func (txn *tikvTxn) CacheTableInfo(id int64, info *model.TableInfo) {
txn.idxNameCache[id] = info
// For partition table, also cached tblInfo with TableID for global index.
if info != nil && info.ID != id {
txn.idxNameCache[info.ID] = info
}
}

func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error {
Expand Down

0 comments on commit 21aa40f

Please sign in to comment.