Skip to content

Commit

Permalink
Merge branch 'master' of github.com:pingcap/tidb into disaggregated_b…
Browse files Browse the repository at this point in the history
…atch_copr

Signed-off-by: guo-shaoge <[email protected]>
  • Loading branch information
guo-shaoge committed Dec 26, 2022
2 parents 6e0b747 + 388364d commit 697b5fe
Show file tree
Hide file tree
Showing 140 changed files with 5,031 additions and 1,693 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4475,8 +4475,8 @@ def go_deps():
name = "org_golang_x_time",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/time",
sum = "h1:52I/1L54xyEQAYdtcSuxtiT84KGYTBGXwayxmIpNJhE=",
version = "v0.2.0",
sum = "h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_tools",
Expand Down
9 changes: 9 additions & 0 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,15 @@ func (h *BindHandle) SetBindRecordStatus(originalSQL string, binding *Binding, n
return
}

// SetBindRecordStatusByDigest set a BindRecord's status to the storage and bind cache.
func (h *BindHandle) SetBindRecordStatusByDigest(newStatus, sqlDigest string) (ok bool, err error) {
oldRecord, err := h.GetBindRecordBySQLDigest(sqlDigest)
if err != nil {
return false, err
}
return h.SetBindRecordStatus(oldRecord.OriginalSQL, nil, newStatus)
}

// GCBindRecord physically removes the deleted bind records in mysql.bind_info.
func (h *BindHandle) GCBindRecord() (err error) {
h.bindInfo.Lock()
Expand Down
28 changes: 25 additions & 3 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,32 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// SplitBatchCreateTable provide a way to split batch into small batch when batch size is large than 6 MB.
// The raft entry has limit size of 6 MB, a batch of CreateTables may hit this limitation
// TODO: shall query string be set for each split batch create, it looks does not matter if we set once for all.
func (gs *tidbSession) SplitBatchCreateTable(schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
var err error
d := domain.GetDomain(gs.se).DDL()
if err = d.BatchCreateTableWithInfo(gs.se, schema, info, append(cs, ddl.OnExistIgnore)...); kv.ErrEntryTooLarge.Equal(err) {
if len(info) == 1 {
return err
}
mid := len(info) / 2
err = gs.SplitBatchCreateTable(schema, info[:mid])
if err != nil {
return err
}
err = gs.SplitBatchCreateTable(schema, info[mid:])
if err != nil {
return err
}
return nil
}
return err
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error {
d := domain.GetDomain(gs.se).DDL()
var dbName model.CIStr

// Disable foreign key check when batch create tables.
Expand Down Expand Up @@ -233,8 +256,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo
cloneTables = append(cloneTables, table)
}
gs.se.SetValue(sessionctx.QueryString, queryBuilder.String())
err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...)
if err != nil {
if err := gs.SplitBatchCreateTable(dbName, cloneTables); err != nil {
//It is possible to failure when TiDB does not support model.ActionCreateTables.
//In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob,
//we fall back to old way that creating table one by one
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func collectGeneratedColumns(se *session, meta *model.TableInfo, cols []*table.C
var genCols []genCol
for i, col := range cols {
if col.GeneratedExpr != nil {
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names)
expr, err := expression.RewriteAstExpr(se, col.GeneratedExpr, schema, names, false)
if err != nil {
return nil, err
}
Expand Down
48 changes: 6 additions & 42 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) {
"tidb_opt_write_row_id": "1",
// always set auto-commit to ON
"autocommit": "1",
// alway set transaction mode to optimistic
// always set transaction mode to optimistic
"tidb_txn_mode": "optimistic",
// disable foreign key checks
"foreign_key_checks": "0",
}

if dsn.Vars != nil {
Expand Down Expand Up @@ -143,47 +145,6 @@ func (timgr *TiDBManager) Close() {
timgr.db.Close()
}

func InitSchema(ctx context.Context, g glue.Glue, database string, tablesSchema map[string]string) error {
logger := log.FromContext(ctx).With(zap.String("db", database))
sqlExecutor := g.GetSQLExecutor()

var createDatabase strings.Builder
createDatabase.WriteString("CREATE DATABASE IF NOT EXISTS ")
common.WriteMySQLIdentifier(&createDatabase, database)
err := sqlExecutor.ExecuteWithLog(ctx, createDatabase.String(), "create database", logger)
if err != nil {
return errors.Trace(err)
}

task := logger.Begin(zap.InfoLevel, "create tables")
var sqlCreateStmts []string
loopCreate:
for tbl, sqlCreateTable := range tablesSchema {
task.Debug("create table", zap.String("schema", sqlCreateTable))

sqlCreateStmts, err = createIfNotExistsStmt(g.GetParser(), sqlCreateTable, database, tbl)
if err != nil {
break
}

// TODO: maybe we should put these createStems into a transaction
for _, s := range sqlCreateStmts {
err = sqlExecutor.ExecuteWithLog(
ctx,
s,
"create table",
logger.With(zap.String("table", common.UniqueTable(database, tbl))),
)
if err != nil {
break loopCreate
}
}
}
task.End(zap.ErrorLevel, err)

return errors.Trace(err)
}

func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string) ([]string, error) {
stmts, _, err := p.ParseSQL(createTable)
if err != nil {
Expand All @@ -199,6 +160,9 @@ func createIfNotExistsStmt(p *parser.Parser, createTable, dbName, tblName string
case *ast.CreateDatabaseStmt:
node.Name = model.NewCIStr(dbName)
node.IfNotExists = true
case *ast.DropDatabaseStmt:
node.Name = model.NewCIStr(dbName)
node.IfExists = true
case *ast.CreateTableStmt:
node.Table.Schema = model.NewCIStr(dbName)
node.Table.Name = model.NewCIStr(tblName)
Expand Down
91 changes: 0 additions & 91 deletions br/pkg/lightning/restore/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,97 +165,6 @@ func TestCreateTableIfNotExistsStmt(t *testing.T) {
`, "m"))
}

func TestInitSchema(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectExec("\\QCREATE TABLE IF NOT EXISTS `db`.`t1` (`a` INT PRIMARY KEY,`b` VARCHAR(200));\\E").
WillReturnResult(sqlmock.NewResult(2, 1))
s.mockDB.
ExpectExec("\\QSET @@SESSION.`FOREIGN_KEY_CHECKS`=0;\\E").
WillReturnResult(sqlmock.NewResult(0, 0))
s.mockDB.
ExpectExec("\\QCREATE TABLE IF NOT EXISTS `db`.`t2` (`xx` TEXT) AUTO_INCREMENT = 11203;\\E").
WillReturnResult(sqlmock.NewResult(2, 1))
s.mockDB.
ExpectClose()

s.mockDB.MatchExpectationsInOrder(false) // maps are unordered.
err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table t1 (a int primary key, b varchar(200));",
"t2": "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;CREATE TABLE `db`.`t2` (xx TEXT) AUTO_INCREMENT=11203;",
})
s.mockDB.MatchExpectationsInOrder(true)
require.NoError(t, err)
}

func TestInitSchemaSyntaxError(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table `t1` with invalid syntax;",
})
require.Error(t, err)
}

func TestInitSchemaErrorLost(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))

s.mockDB.
ExpectExec("CREATE TABLE IF NOT EXISTS.*").
WillReturnError(&mysql.MySQLError{
Number: tmysql.ErrTooBigFieldlength,
Message: "Column length too big",
})

s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table `t1` (a int);",
"t2": "create table t2 (a int primary key, b varchar(200));",
})
require.Regexp(t, ".*Column length too big.*", err.Error())
}

func TestInitSchemaUnsupportedSchemaError(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()

s.mockDB.
ExpectExec("CREATE DATABASE IF NOT EXISTS `db`").
WillReturnResult(sqlmock.NewResult(1, 1))
s.mockDB.
ExpectExec("CREATE TABLE IF NOT EXISTS `db`.`t1`.*").
WillReturnError(&mysql.MySQLError{
Number: tmysql.ErrTooBigFieldlength,
Message: "Column length too big",
})
s.mockDB.
ExpectClose()

err := InitSchema(ctx, s.tiGlue, "db", map[string]string{
"t1": "create table `t1` (a VARCHAR(999999999));",
})
require.Regexp(t, ".*Column length too big.*", err.Error())
}

func TestDropTable(t *testing.T) {
s := newTiDBSuite(t)
ctx := context.Background()
Expand Down
3 changes: 3 additions & 0 deletions br/tests/lightning_foreign_key/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[tikv-importer]
# Set on-duplicate=error to force using insert statement to write data.
on-duplicate = "error"
8 changes: 8 additions & 0 deletions br/tests/lightning_foreign_key/data/fk.t-schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE TABLE `t`
(
`a` bigint(20) NOT NULL,
`b` bigint(20) DEFAULT NULL,
PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,
KEY `fk_1` (`b`),
CONSTRAINT `fk_1` FOREIGN KEY (`b`) REFERENCES `test`.`t2` (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
6 changes: 6 additions & 0 deletions br/tests/lightning_foreign_key/data/fk.t.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
a,b
1,1
2,2
3,3
4,4
5,5
28 changes: 28 additions & 0 deletions br/tests/lightning_foreign_key/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash
#
# Copyright 2022 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu

# Create existing tables that import data will reference.
run_sql 'CREATE DATABASE IF NOT EXISTS fk;'
run_sql 'CREATE TABLE fk.t2 (a BIGINT PRIMARY KEY);'

for BACKEND in tidb local; do
run_sql 'DROP TABLE IF EXISTS fk.t;'
run_lightning --backend $BACKEND
run_sql 'SELECT GROUP_CONCAT(a) FROM fk.t ORDER BY a;'
check_contains '1,2,3,4,5'
done
7 changes: 1 addition & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,12 +807,7 @@ func (b *backfillScheduler) initCopReqSenderPool() {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
ver, err := sessCtx.GetStore().CurrentVersion(kv.GlobalTxnScope)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return
}
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, ver.Ver)
b.copReqSenderPool = newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore())
}

func (b *backfillScheduler) canSkipError(err error) bool {
Expand Down
24 changes: 24 additions & 0 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,27 @@ func testNewContext(store kv.Storage) sessionctx.Context {
ctx.Store = store
return ctx
}

func TestIssue40135(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")

tk.MustExec("CREATE TABLE t40135 ( a tinyint DEFAULT NULL, b varchar(32) DEFAULT 'md') PARTITION BY HASH (a) PARTITIONS 2")
one := true
hook := &ddl.TestDDLCallback{Do: dom}
var checkErr error
hook.OnJobRunBeforeExported = func(job *model.Job) {
if one {
one = false
_, checkErr = tk1.Exec("alter table t40135 change column a aNew SMALLINT NULL DEFAULT '-14996'")
}
}
dom.DDL().SetHook(hook)
tk.MustExec("alter table t40135 modify column a MEDIUMINT NULL DEFAULT '6243108' FIRST")

require.ErrorContains(t, checkErr, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
}
18 changes: 5 additions & 13 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2246,14 +2246,6 @@ func TestExchangePartitionTableCompatiable(t *testing.T) {
"alter table pt8 exchange partition p0 with table nt8;",
dbterror.ErrTablesDifferentMetadata,
},
{
// foreign key test
// Partition table doesn't support to add foreign keys in mysql
"create table pt9 (id int not null primary key auto_increment,t_id int not null) partition by hash(id) partitions 1;",
"create table nt9 (id int not null primary key auto_increment, t_id int not null,foreign key fk_id (t_id) references pt5(id));",
"alter table pt9 exchange partition p0 with table nt9;",
dbterror.ErrPartitionExchangeForeignKey,
},
{
// Generated column (virtual)
"create table pt10 (id int not null, lname varchar(30), fname varchar(100) generated always as (concat(lname,' ')) virtual) partition by hash(id) partitions 1;",
Expand Down Expand Up @@ -4543,17 +4535,17 @@ func TestAlterModifyColumnOnPartitionedTableRename(t *testing.T) {
tk.MustExec("create database " + schemaName)
tk.MustExec("use " + schemaName)
tk.MustExec(`create table t (a int, b char) partition by range (a) (partition p0 values less than (10))`)
tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'")
tk.MustContainErrMsg(`alter table t change a c int`, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
tk.MustExec(`drop table t`)
tk.MustExec(`create table t (a char, b char) partition by range columns (a) (partition p0 values less than ('z'))`)
tk.MustContainErrMsg(`alter table t change a c char`, "[ddl:8200]New column does not match partition definitions: [ddl:1567]partition column name cannot be found")
tk.MustContainErrMsg(`alter table t change a c char`, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
tk.MustExec(`drop table t`)
tk.MustExec(`create table t (a int, b char) partition by list (a) (partition p0 values in (10))`)
tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'")
tk.MustContainErrMsg(`alter table t change a c int`, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
tk.MustExec(`drop table t`)
tk.MustExec(`create table t (a char, b char) partition by list columns (a) (partition p0 values in ('z'))`)
tk.MustContainErrMsg(`alter table t change a c char`, "[ddl:8200]New column does not match partition definitions: [ddl:1567]partition column name cannot be found")
tk.MustContainErrMsg(`alter table t change a c char`, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
tk.MustExec(`drop table t`)
tk.MustExec(`create table t (a int, b char) partition by hash (a) partitions 3`)
tk.MustContainErrMsg(`alter table t change a c int`, "[planner:1054]Unknown column 'a' in 'expression'")
tk.MustContainErrMsg(`alter table t change a c int`, "[ddl:8200]Unsupported modify column: Column 'a' has a partitioning function dependency and cannot be renamed")
}
Loading

0 comments on commit 697b5fe

Please sign in to comment.