From a8403544265b590bef8dd8d54611ab4955fe16e6 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 13 Oct 2022 10:47:50 +0800 Subject: [PATCH 1/9] *: close resultSet (#38435) --- executor/executor_test.go | 9 +++------ executor/index_advise_test.go | 9 +++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index feac2f5a7c48a..5063362462cb5 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5198,12 +5198,9 @@ func TestHistoryRead(t *testing.T) { require.Greater(t, snapshotTS, curVer1.Ver) require.Less(t, snapshotTS, curVer2.Ver) tk.MustQuery("select * from history_read").Check(testkit.Rows("1")) - _, err = tk.Exec("insert history_read values (2)") - require.Error(t, err) - _, err = tk.Exec("update history_read set a = 3 where a = 1") - require.Error(t, err) - _, err = tk.Exec("delete from history_read where a = 1") - require.Error(t, err) + tk.MustExecToErr("insert history_read values (2)") + tk.MustExecToErr("update history_read set a = 3 where a = 1") + tk.MustExecToErr("delete from history_read where a = 1") tk.MustExec("set @@tidb_snapshot = ''") tk.MustQuery("select * from history_read").Check(testkit.Rows("1", "2")) tk.MustExec("insert history_read values (3)") diff --git a/executor/index_advise_test.go b/executor/index_advise_test.go index a9bdb36232424..3415ffe83537b 100644 --- a/executor/index_advise_test.go +++ b/executor/index_advise_test.go @@ -29,12 +29,9 @@ func TestIndexAdvise(t *testing.T) { tk := testkit.NewTestKit(t, store) - _, err := tk.Exec("index advise infile '/tmp/nonexistence.sql'") - require.EqualError(t, err, "Index Advise: don't support load file without local field") - _, err = tk.Exec("index advise local infile ''") - require.EqualError(t, err, "Index Advise: infile path is empty") - _, err = tk.Exec("index advise local infile '/tmp/nonexistence.sql' lines terminated by ''") - require.EqualError(t, err, "Index Advise: don't support advise index for SQL terminated by nil") + tk.MustGetErrMsg("index advise infile '/tmp/nonexistence.sql'", "Index Advise: don't support load file without local field") + tk.MustGetErrMsg("index advise local infile ''", "Index Advise: infile path is empty") + tk.MustGetErrMsg("index advise local infile '/tmp/nonexistence.sql' lines terminated by ''", "Index Advise: don't support advise index for SQL terminated by nil") path := "/tmp/index_advise.sql" fp, err := os.Create(path) From 4394a2a8f18ebf36ad12fbf7406c3d5c6ab48f79 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 13 Oct 2022 11:15:51 +0800 Subject: [PATCH 2/9] bazel: update config and enable race for part test (#38434) --- bindinfo/BUILD.bazel | 1 + br/pkg/restore/BUILD.bazel | 1 + distsql/BUILD.bazel | 1 + domain/BUILD.bazel | 1 + store/mockstore/unistore/BUILD.bazel | 1 + util/BUILD.bazel | 1 + util/expensivequery/BUILD.bazel | 14 +----------- util/gctuner/BUILD.bazel | 7 +++++- util/memoryusagealarm/BUILD.bazel | 34 ++++++++++++++++++++++++++++ 9 files changed, 47 insertions(+), 14 deletions(-) create mode 100644 util/memoryusagealarm/BUILD.bazel diff --git a/bindinfo/BUILD.bazel b/bindinfo/BUILD.bazel index faffa0420b0b7..93b55bfab21c2 100644 --- a/bindinfo/BUILD.bazel +++ b/bindinfo/BUILD.bazel @@ -57,6 +57,7 @@ go_test( ], embed = [":bindinfo"], flaky = True, + race = "on", shard_count = 50, deps = [ "//config", diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index e06e39e5925bd..d470d9d5a2655 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -143,6 +143,7 @@ go_test( "//types", "//util/codec", "//util/mathutil", + "@com_github_fsouza_fake_gcs_server//fakestorage", "@com_github_golang_protobuf//proto", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", diff --git a/distsql/BUILD.bazel b/distsql/BUILD.bazel index 8934e398a5f0b..5839f55fbc52c 100644 --- a/distsql/BUILD.bazel +++ b/distsql/BUILD.bazel @@ -63,6 +63,7 @@ go_test( ], embed = [":distsql"], flaky = True, + race = "on", deps = [ "//kv", "//parser/charset", diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index b32697ed79534..1c4a433c79e86 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -52,6 +52,7 @@ go_library( "//util/execdetails", "//util/expensivequery", "//util/logutil", + "//util/memoryusagealarm", "//util/servermemorylimit", "//util/sqlexec", "@com_github_ngaut_pools//:pools", diff --git a/store/mockstore/unistore/BUILD.bazel b/store/mockstore/unistore/BUILD.bazel index cf2a6b23ed56d..50a04a77f9bfe 100644 --- a/store/mockstore/unistore/BUILD.bazel +++ b/store/mockstore/unistore/BUILD.bazel @@ -27,6 +27,7 @@ go_library( "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/debugpb", "@com_github_pingcap_kvproto//pkg/errorpb", + "@com_github_pingcap_kvproto//pkg/keyspacepb", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_pingcap_kvproto//pkg/mpp", diff --git a/util/BUILD.bazel b/util/BUILD.bazel index 65349065e4be5..879b7e86df089 100644 --- a/util/BUILD.bazel +++ b/util/BUILD.bazel @@ -59,6 +59,7 @@ go_test( "processinfo_test.go", "security_test.go", "urls_test.go", + "util_test.go", "wait_group_wrapper_test.go", ], data = glob(["tls_test/**"]), diff --git a/util/expensivequery/BUILD.bazel b/util/expensivequery/BUILD.bazel index 789639dc53452..7ecedc97b25c7 100644 --- a/util/expensivequery/BUILD.bazel +++ b/util/expensivequery/BUILD.bazel @@ -2,22 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "expensivequery", - srcs = [ - "expensivequery.go", - "memory_usage_alarm.go", - ], + srcs = ["expensivequery.go"], importpath = "github.com/pingcap/tidb/util/expensivequery", visibility = ["//visibility:public"], deps = [ - "//config", - "//parser", "//sessionctx/variable", "//util", - "//util/disk", "//util/logutil", - "//util/memory", "@com_github_pingcap_log//:log", - "@org_golang_x_exp//slices", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", ], @@ -30,11 +22,7 @@ go_test( embed = [":expensivequery"], flaky = True, deps = [ - "//sessionctx/stmtctx", "//testkit/testsetup", - "//util", - "//util/memory", - "@com_github_stretchr_testify//assert", "@org_uber_go_goleak//:goleak", ], ) diff --git a/util/gctuner/BUILD.bazel b/util/gctuner/BUILD.bazel index 3e90b53695d8b..d20f8b6a5b418 100644 --- a/util/gctuner/BUILD.bazel +++ b/util/gctuner/BUILD.bazel @@ -5,12 +5,15 @@ go_library( srcs = [ "finalizer.go", "mem.go", + "memory_limit_tuner.go", "tuner.go", ], importpath = "github.com/pingcap/tidb/util/gctuner", visibility = ["//visibility:public"], deps = [ "//util", + "//util/memory", + "@com_github_pingcap_failpoint//:failpoint", "@org_uber_go_atomic//:atomic", ], ) @@ -20,13 +23,15 @@ go_test( srcs = [ "finalizer_test.go", "mem_test.go", + "memory_limit_tuner_test.go", "tuner_test.go", ], embed = [":gctuner"], flaky = True, race = "on", deps = [ - "@com_github_stretchr_testify//assert", + "//util/memory", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//require", ], ) diff --git a/util/memoryusagealarm/BUILD.bazel b/util/memoryusagealarm/BUILD.bazel new file mode 100644 index 0000000000000..48c7910492ecc --- /dev/null +++ b/util/memoryusagealarm/BUILD.bazel @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "memoryusagealarm", + srcs = ["memoryusagealarm.go"], + importpath = "github.com/pingcap/tidb/util/memoryusagealarm", + visibility = ["//visibility:public"], + deps = [ + "//config", + "//sessionctx/variable", + "//util", + "//util/disk", + "//util/logutil", + "//util/memory", + "@org_golang_x_exp//slices", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + ], +) + +go_test( + name = "memoryusagealarm_test", + srcs = ["memoryusagealarm_test.go"], + embed = [":memoryusagealarm"], + flaky = True, + race = "on", + deps = [ + "//sessionctx/stmtctx", + "//util", + "//util/execdetails", + "//util/memory", + "@com_github_stretchr_testify//assert", + ], +) From a5c7c039c6c33b0a911b9a2eac7b28048d097c05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Thu, 13 Oct 2022 15:59:51 +0800 Subject: [PATCH 3/9] tests: use atomic values for flushed epoch in `streamhelper_test` (#38446) close pingcap/tidb#37482 --- br/pkg/streamhelper/basic_lib_for_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index c24dab9bdb8c7..9e438c32f0f1f 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -28,21 +28,21 @@ import ( ) type flushSimulator struct { - flushedEpoch uint64 + flushedEpoch atomic.Uint64 enabled bool } -func (c flushSimulator) makeError(requestedEpoch uint64) *errorpb.Error { +func (c *flushSimulator) makeError(requestedEpoch uint64) *errorpb.Error { if !c.enabled { return nil } - if c.flushedEpoch == 0 { + if c.flushedEpoch.Load() == 0 { e := errorpb.Error{ Message: "not flushed", } return &e } - if c.flushedEpoch != requestedEpoch { + if c.flushedEpoch.Load() != requestedEpoch { e := errorpb.Error{ Message: "flushed epoch not match", } @@ -51,7 +51,7 @@ func (c flushSimulator) makeError(requestedEpoch uint64) *errorpb.Error { return nil } -func (c flushSimulator) fork() flushSimulator { +func (c *flushSimulator) fork() flushSimulator { return flushSimulator{ enabled: c.enabled, } @@ -108,7 +108,7 @@ func (r *region) splitAt(newID uint64, k string) *region { } func (r *region) flush() { - r.fsim.flushedEpoch = r.epoch + r.fsim.flushedEpoch.Store(r.epoch) } func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.GetLastFlushTSOfRegionRequest, opts ...grpc.CallOption) (*logbackup.GetLastFlushTSOfRegionResponse, error) { @@ -320,7 +320,7 @@ func (f *fakeCluster) advanceCheckpoints() uint64 { if cp < minCheckpoint { minCheckpoint = cp } - r.fsim.flushedEpoch = 0 + r.fsim.flushedEpoch.Store(0) }) } log.Info("checkpoint updated", zap.Uint64("to", minCheckpoint)) @@ -369,7 +369,7 @@ func (r *region) String() string { hex.EncodeToString(r.rng.EndKey), r.checkpoint.Load(), r.leader, - r.fsim.flushedEpoch) + r.fsim.flushedEpoch.Load()) } func (f *fakeStore) String() string { From a10bb9e32292f4110d690a131a0822e553548bda Mon Sep 17 00:00:00 2001 From: lizhenhuan <1916038084@qq.com> Date: Thu, 13 Oct 2022 17:25:52 +0800 Subject: [PATCH 4/9] expression: Push down json_contains to tikv (#37840) close pingcap/tidb#37839 --- expression/expression.go | 2 +- planner/core/integration_test.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/expression/expression.go b/expression/expression.go index bb38d8544185b..e8f38910c1264 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -1003,7 +1003,7 @@ func scalarExprSupportedByTiKV(sf *ScalarFunction) bool { ast.JSONType, ast.JSONExtract, ast.JSONObject, ast.JSONArray, ast.JSONMerge, ast.JSONSet, ast.JSONInsert /*ast.JSONReplace,*/, ast.JSONRemove, ast.JSONLength, // FIXME: JSONUnquote is incompatible with Coprocessor - ast.JSONUnquote, + ast.JSONUnquote, ast.JSONContains, // date functions. ast.Date, ast.Week /* ast.YearWeek, ast.ToSeconds */, ast.DateDiff, diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 4ee6a4a711395..e716561b114e1 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3065,7 +3065,7 @@ func TestScalarFunctionPushDown(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t(id int signed, id2 int unsigned, c varchar(11), d datetime, b double, e bit(10))") - tk.MustExec("insert into t(id, id2, c, d) values (-1, 1, 'abc', '2021-12-12')") + tk.MustExec("insert into t(id, id2, c, d) values (-1, 1, '{\"a\":1}', '2021-12-12')") rows := [][]interface{}{ {"TableReader_7", "root", "data:Selection_6"}, {"└─Selection_6", "cop[tikv]", "right(test.t.c, 1)"}, @@ -3178,6 +3178,10 @@ func TestScalarFunctionPushDown(t *testing.T) { rows[1][2] = "ascii(cast(test.t.e, var_string(2)))" tk.MustQuery("explain analyze select /*+read_from_storage(tikv[t])*/ * from t where ascii(e);"). CheckAt([]int{0, 3, 6}, rows) + + rows[1][2] = "json_contains(cast(test.t.c, json BINARY), cast(\"1\", json BINARY))" + tk.MustQuery("explain analyze select /*+read_from_storage(tikv[t])*/ * from t where json_contains(c, '1');"). + CheckAt([]int{0, 3, 6}, rows) } func TestDistinctScalarFunctionPushDown(t *testing.T) { From f45f4978a10a4b94734650805aa5be41f9fb5838 Mon Sep 17 00:00:00 2001 From: xiongjiwei Date: Fri, 14 Oct 2022 00:07:51 +0800 Subject: [PATCH 5/9] mockkv: make pk id = -1 if no primary key column is used (#38443) --- store/mockstore/unistore/cophandler/cop_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index 5f375f2bfdc30..2b32c168329bd 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -401,7 +401,7 @@ func newRowDecoder(columnInfos []*tipb.ColumnInfo, fieldTps []*types.FieldType, if primaryCols != nil { pkCols = primaryCols } else { - pkCols = []int64{0} + pkCols = []int64{-1} } } def := func(i int, chk *chunk.Chunk) error { From af714c295e65040109d601fd130d02a5b1ad8bcc Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 13 Oct 2022 18:37:51 +0200 Subject: [PATCH 6/9] ddl: support modify column on partitioned table (#38302) close pingcap/tidb#38297 --- ddl/backfilling.go | 3 + ddl/column.go | 38 +++++++++-- ddl/db_partition_test.go | 128 +++++++++++++++++++++++++++++++++++ ddl/ddl_api.go | 3 - ddl/failtest/fail_db_test.go | 4 +- 5 files changed, 163 insertions(+), 13 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 0736cbb58215f..7761003d78e23 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -471,6 +471,9 @@ func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers physicalTableID := reorgInfo.PhysicalTableID var prefix kv.Key + if tbl, ok := t.(table.PartitionedTable); ok { + t = tbl.GetPartition(physicalTableID) + } if reorgInfo.mergingTmpIdx { prefix = t.IndexPrefix() } else { diff --git a/ddl/column.go b/ddl/column.go index 6beba60a35d5c..10db5120e9351 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1020,9 +1020,30 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf return elements } -func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error { +func (w *worker) updatePhysicalTableRow(t table.Table, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(w.sessPool, t, typeUpdateColumnWorker, reorgInfo) + if tbl, ok := t.(table.PartitionedTable); ok { + done := false + for !done { + p := tbl.GetPartition(reorgInfo.PhysicalTableID) + if p == nil { + return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID) + } + err := w.writePhysicalTableRecord(w.sessPool, p, typeUpdateColumnWorker, reorgInfo) + if err != nil { + return err + } + done, err = w.updateReorgInfo(tbl, reorgInfo) + if err != nil { + return errors.Trace(err) + } + } + return nil + } + if tbl, ok := t.(table.PhysicalTable); ok { + return w.writePhysicalTableRecord(w.sessPool, tbl, typeUpdateColumnWorker, reorgInfo) + } + return dbterror.ErrCancelledDDLJob.GenWithStack("internal error for phys tbl id: %d tbl id: %d", reorgInfo.PhysicalTableID, t.Meta().ID) } // TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started. @@ -1044,22 +1065,25 @@ func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error } } }) - // TODO: Support partition tables. if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { - //nolint:forcetypeassert - err := w.updatePhysicalTableRow(t.(table.PhysicalTable), reorgInfo) + err := w.updatePhysicalTableRow(t, reorgInfo) if err != nil { return errors.Trace(err) } } + var physTbl table.PhysicalTable + if tbl, ok := t.(table.PartitionedTable); ok { + physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID) + } else if tbl, ok := t.(table.PhysicalTable); ok { + physTbl = tbl + } // Get the original start handle and end handle. currentVer, err := getValidCurrentVersion(reorgInfo.d.store) if err != nil { return errors.Trace(err) } - //nolint:forcetypeassert - originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) + originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index ff75ba2f41b10..93ed264deb6e8 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -4528,3 +4528,131 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) { ` PARTITION "p4" VALUES LESS THAN ('\\''\t\n','\\''\t\n'),` + "\n" + ` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`)) } +func TestAlterModifyColumnOnPartitionedTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("create database AlterPartTable") + tk.MustExec("use AlterPartTable") + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), key (b))`) + tk.MustExec(`insert into t values (7, "07"), (8, "08"),(23,"23"),(34,"34πŸ’₯"),(46,"46"),(57,"57")`) + tk.MustQuery(`show create table t`).Check(testkit.Rows( + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + // TODO: Why does it allow πŸ’₯ as a latin1 character? + tk.MustQuery(`select hex(b) from t where a = 34`).Check(testkit.Rows("3334F09F92A5")) + tk.MustExec(`alter table t modify b varchar(200) charset latin1`) + tk.MustQuery(`show create table t`).Check(testkit.Rows( + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(200) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustQuery(`select hex(b) from t where a = 34`).Check(testkit.Rows("3334F09F92A5")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57", + "7 07", + "8 08")) + tk.MustQuery(`select * from t order by b`).Check(testkit.Rows(""+ + "7 07", + "8 08", + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57")) + tk.MustExec(`alter table t change b c varchar(200) charset utf8mb4`) + tk.MustExec(`drop table t`) + tk.MustExec(`create table t (a int unsigned PRIMARY KEY, b varchar(255), key (b)) partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition p2 values less than (30),` + + ` partition pMax values less than (MAXVALUE))`) + tk.MustExec(`insert into t values (7, "07"), (8, "08"),(23,"23"),(34,"34πŸ’₯"),(46,"46"),(57,"57")`) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57", + "7 07", + "8 08")) + tk.MustQuery(`select * from t order by b`).Check(testkit.Rows(""+ + "7 07", + "8 08", + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57")) + tk.MustQuery(`show create table t`).Check(testkit.Rows( + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (30),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustExec(`alter table t modify b varchar(200) charset latin1`) + tk.MustQuery(`show create table t`).Check(testkit.Rows( + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `b` varchar(200) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`b`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (30),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57", + "7 07", + "8 08")) + tk.MustQuery(`select * from t order by b`).Check(testkit.Rows(""+ + "7 07", + "8 08", + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57")) + tk.MustExec(`alter table t change b c varchar(150) charset utf8mb4`) + tk.MustQuery(`show create table t`).Check(testkit.Rows( + "t CREATE TABLE `t` (\n" + + " `a` int(10) unsigned NOT NULL,\n" + + " `c` varchar(150) DEFAULT NULL,\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + + " KEY `b` (`c`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY RANGE (`a`)\n" + + "(PARTITION `p0` VALUES LESS THAN (10),\n" + + " PARTITION `p1` VALUES LESS THAN (20),\n" + + " PARTITION `p2` VALUES LESS THAN (30),\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE))")) + tk.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57", + "7 07", + "8 08")) + tk.MustQuery(`select * from t order by c`).Check(testkit.Rows(""+ + "7 07", + "8 08", + "23 23", + "34 34πŸ’₯", + "46 46", + "57 57")) +} diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 153567e2c9fe1..54f8243b6df8c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -4543,9 +4543,6 @@ func GetModifiableColumnJob( if err = isGeneratedRelatedColumn(t.Meta(), newCol.ColumnInfo, col.ColumnInfo); err != nil { return nil, errors.Trace(err) } - if t.Meta().Partition != nil { - return nil, dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("table is partition table") - } } // We don't support modifying column from not_auto_increment to auto_increment. diff --git a/ddl/failtest/fail_db_test.go b/ddl/failtest/fail_db_test.go index e4d8ea7e58342..bde5e9b1b9569 100644 --- a/ddl/failtest/fail_db_test.go +++ b/ddl/failtest/fail_db_test.go @@ -493,8 +493,6 @@ func TestModifyColumn(t *testing.T) { tk.MustExec("admin check table t") // Test unsupported statements. - tk.MustExec("create table t1(a int) partition by hash (a) partitions 2") - tk.MustGetErrMsg("alter table t1 modify column a mediumint", "[ddl:8200]Unsupported modify column: table is partition table") tk.MustExec("create table t2(id int, a int, b int generated always as (abs(a)) virtual, c int generated always as (a+1) stored)") tk.MustGetErrMsg("alter table t2 modify column b mediumint", "[ddl:8200]Unsupported modify column: newCol IsGenerated false, oldCol IsGenerated true") tk.MustGetErrMsg("alter table t2 modify column c mediumint", "[ddl:8200]Unsupported modify column: newCol IsGenerated false, oldCol IsGenerated true") @@ -531,7 +529,7 @@ func TestModifyColumn(t *testing.T) { tk.MustExec("insert into t5 values (1,1),(2,2),(3,3),(4,4),(5,5);") tk.MustExec("alter table t5 modify a int not null;") - tk.MustExec("drop table t, t1, t2, t3, t4, t5") + tk.MustExec("drop table t, t2, t3, t4, t5") } func TestPartitionAddPanic(t *testing.T) { From 84fbfcada3d9564fe922771f010db318d27fc77a Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 14 Oct 2022 01:15:52 +0800 Subject: [PATCH 7/9] planner: revise isnullRejected check for `And` and `OR` (#38430) close pingcap/tidb#38304 --- expression/expression.go | 68 ++++++++++++++++++++++++ planner/core/plan_test.go | 16 ++++++ planner/core/rule_predicate_push_down.go | 24 +++++---- 3 files changed, 98 insertions(+), 10 deletions(-) diff --git a/expression/expression.go b/expression/expression.go index e8f38910c1264..e4b8ae764a2bd 100644 --- a/expression/expression.go +++ b/expression/expression.go @@ -818,6 +818,10 @@ func EvaluateExprWithNull(ctx sessionctx.Context, schema *Schema, expr Expressio if MaybeOverOptimized4PlanCache(ctx, []Expression{expr}) { return expr } + if ctx.GetSessionVars().StmtCtx.InNullRejectCheck { + expr, _ = evaluateExprWithNullInNullRejectCheck(ctx, schema, expr) + return expr + } return evaluateExprWithNull(ctx, schema, expr) } @@ -842,6 +846,70 @@ func evaluateExprWithNull(ctx sessionctx.Context, schema *Schema, expr Expressio return expr } +// evaluateExprWithNullInNullRejectCheck sets columns in schema as null and calculate the final result of the scalar function. +// If the Expression is a non-constant value, it means the result is unknown. +// The returned bool values indicates whether the value is influenced by the Null Constant transformed from schema column +// when the value is Null Constant. +func evaluateExprWithNullInNullRejectCheck(ctx sessionctx.Context, schema *Schema, expr Expression) (Expression, bool) { + switch x := expr.(type) { + case *ScalarFunction: + args := make([]Expression, len(x.GetArgs())) + nullFromSets := make([]bool, len(x.GetArgs())) + for i, arg := range x.GetArgs() { + args[i], nullFromSets[i] = evaluateExprWithNullInNullRejectCheck(ctx, schema, arg) + } + + // allNullFromSet indicates whether all arguments are Null Constant and the Null Constant is affected by the column of the schema. + allNullFromSet := true + for i := range args { + if cons, ok := args[i].(*Constant); ok && cons.Value.IsNull() && !nullFromSets[i] { + allNullFromSet = false + break + } + } + + // allArgsNullFromSet indicates whether all Null Constant are affected by the column of the schema + allArgsNullFromSet := true + for i := range args { + if cons, ok := args[i].(*Constant); ok && cons.Value.IsNull() && nullFromSets[i] { + continue + } + allArgsNullFromSet = false + } + + // If all the args are Null Constant and affected by the column schema, then we should keep it. + // Otherwise, we shouldn't let Null Constant which affected by the column schema participate in computing in `And` and `OR` + // due to the result of `AND` and `OR are uncertain if one of the arguments is NULL. + if !allArgsNullFromSet { + for i := range args { + if cons, ok := args[i].(*Constant); ok && cons.Value.IsNull() && nullFromSets[i] { + if x.FuncName.L == ast.LogicAnd { + args[i] = NewOne() + } + if x.FuncName.L == ast.LogicOr { + args[i] = NewZero() + } + } + } + } + c := NewFunctionInternal(ctx, x.FuncName.L, x.RetType.Clone(), args...) + cons, ok := c.(*Constant) + // If the return expr is Null Constant, and all the Null Constant arguments are affected by column schema, + // then we think the result Null Constant is also affected by the column schema + return c, ok && cons.Value.IsNull() && allNullFromSet + case *Column: + if !schema.Contains(x) { + return x, false + } + return &Constant{Value: types.Datum{}, RetType: types.NewFieldType(mysql.TypeNull)}, true + case *Constant: + if x.DeferredExpr != nil { + return FoldConstant(x), false + } + } + return expr, false +} + // TableInfo2SchemaAndNames converts the TableInfo to the schema and name slice. func TableInfo2SchemaAndNames(ctx sessionctx.Context, dbName model.CIStr, tbl *model.TableInfo) (*Schema, []*types.FieldName, error) { cols, names, err := ColumnInfos2ColumnsAndNames(ctx, dbName, tbl.Name, tbl.Cols(), tbl) diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 242d868a8c571..ebbb9c7373401 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -1076,3 +1076,19 @@ func TestNullEQConditionPlan(t *testing.T) { {"Point_Get_5", "root", "handle:0"}, }) } + +// https://github.com/pingcap/tidb/issues/38304 +func TestOuterJoinOnNull(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("CREATE TABLE t0(c0 BLOB(5), c1 BLOB(5));") + tk.MustExec("CREATE TABLE t1 (c0 BOOL);") + tk.MustExec("INSERT INTO t1 VALUES(false);") + tk.MustExec("INSERT INTO t0(c0, c1) VALUES ('>', true);") + tk.MustQuery("SELECT * FROM t0 LEFT OUTER JOIN t1 ON NULL; ").Check(testkit.Rows("> 1 ")) + tk.MustQuery("SELECT NOT '2' =(t1.c0 AND t0.c1 IS NULL) FROM t0 LEFT OUTER JOIN t1 ON NULL; ").Check(testkit.Rows("1")) + tk.MustQuery("SELECT * FROM t0 LEFT JOIN t1 ON NULL WHERE NOT '2' =(t1.c0 AND t0.c1 IS NULL); ").Check(testkit.Rows("> 1 ")) + tk.MustQuery("SELECT * FROM t0 LEFT JOIN t1 ON NULL WHERE t1.c0 or true; ").Check(testkit.Rows("> 1 ")) + tk.MustQuery("SELECT * FROM t0 LEFT JOIN t1 ON NULL WHERE not(t1.c0 and false); ").Check(testkit.Rows("> 1 ")) +} diff --git a/planner/core/rule_predicate_push_down.go b/planner/core/rule_predicate_push_down.go index 96d62942f2346..bebed1cab141a 100644 --- a/planner/core/rule_predicate_push_down.go +++ b/planner/core/rule_predicate_push_down.go @@ -440,16 +440,20 @@ func isNullRejected(ctx sessionctx.Context, schema *expression.Schema, expr expr } sc := ctx.GetSessionVars().StmtCtx sc.InNullRejectCheck = true - result := expression.EvaluateExprWithNull(ctx, schema, expr) - sc.InNullRejectCheck = false - x, ok := result.(*expression.Constant) - if !ok { - return false - } - if x.Value.IsNull() { - return true - } else if isTrue, err := x.Value.ToBool(sc); err == nil && isTrue == 0 { - return true + defer func() { + sc.InNullRejectCheck = false + }() + for _, cond := range expression.SplitCNFItems(expr) { + result := expression.EvaluateExprWithNull(ctx, schema, cond) + x, ok := result.(*expression.Constant) + if !ok { + continue + } + if x.Value.IsNull() { + return true + } else if isTrue, err := x.Value.ToBool(sc); err == nil && isTrue == 0 { + return true + } } return false } From 400f5855785323e51e30f0cbd9ac3fad2fc27815 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 14 Oct 2022 12:21:52 +0800 Subject: [PATCH 8/9] br/restore: try keep table ID when possible (#38033) close pingcap/tidb#38438 --- br/pkg/glue/glue.go | 5 +- br/pkg/gluetidb/glue.go | 13 +-- br/pkg/restore/client.go | 29 +++++++ br/pkg/restore/db.go | 26 +++++- br/pkg/restore/prealloc_table_id/alloc.go | 77 +++++++++++++++++ .../restore/prealloc_table_id/alloc_test.go | 85 +++++++++++++++++++ br/pkg/task/restore.go | 2 + ddl/db_table_test.go | 45 ++++++++++ ddl/ddl.go | 45 +++++++++- ddl/ddl_api.go | 29 ++++--- ddl/schematracker/checker.go | 4 +- ddl/schematracker/dm_tracker.go | 10 ++- executor/brie.go | 4 +- meta/meta.go | 17 ++++ 14 files changed, 359 insertions(+), 32 deletions(-) create mode 100644 br/pkg/restore/prealloc_table_id/alloc.go create mode 100644 br/pkg/restore/prealloc_table_id/alloc_test.go diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index cb4681dad4226..450a0b73ec659 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -5,6 +5,7 @@ package glue import ( "context" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" @@ -42,7 +43,7 @@ type Session interface { Execute(ctx context.Context, sql string) error ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error CreateDatabase(ctx context.Context, schema *model.DBInfo) error - CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error + CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error Close() GetGlobalVariable(name string) (string, error) @@ -51,7 +52,7 @@ type Session interface { // BatchCreateTableSession is an interface to batch create table parallelly type BatchCreateTableSession interface { - CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error + CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error } // Progress is an interface recording the current execution progress. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 459437e33b091..5483380d03e65 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -204,7 +204,7 @@ func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model. } // CreateTables implements glue.BatchCreateTableSession. -func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { +func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { d := domain.GetDomain(gs.se).DDL() var dbName model.CIStr @@ -231,7 +231,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo cloneTables = append(cloneTables, table) } gs.se.SetValue(sessionctx.QueryString, queryBuilder.String()) - err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, ddl.OnExistIgnore) + err := d.BatchCreateTableWithInfo(gs.se, dbName, cloneTables, append(cs, ddl.OnExistIgnore)...) if err != nil { //It is possible to failure when TiDB does not support model.ActionCreateTables. //In this circumstance, BatchCreateTableWithInfo returns errno.ErrInvalidDDLJob, @@ -245,7 +245,7 @@ func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*mo } // CreateTable implements glue.Session. -func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { +func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { d := domain.GetDomain(gs.se).DDL() query, err := gs.showCreateTable(table) if err != nil { @@ -259,7 +259,8 @@ func (gs *tidbSession) CreateTable(ctx context.Context, dbName model.CIStr, tabl newPartition.Definitions = append([]model.PartitionDefinition{}, table.Partition.Definitions...) table.Partition = &newPartition } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore) + + return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...) } // Close implements glue.Session. @@ -349,13 +350,13 @@ func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.P } // CreateTables implements glue.BatchCreateTableSession. -func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { +func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { log.Fatal("unimplemented CreateDatabase for mock session") return nil } // CreateTable implements glue.Session. -func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { +func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { log.Fatal("unimplemented CreateDatabase for mock session") return nil } diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 17a788f0e0cf2..7e14cda3014c8 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/redact" + tidalloc "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/br/pkg/rtree" @@ -173,6 +174,9 @@ type Client struct { // see RestoreCommonConfig.WithSysTable withSysTable bool + + // the successfully preallocated table IDs. + preallocedTableIDs *tidalloc.PreallocIDs } // NewRestoreClient returns a new RestoreClient. @@ -237,6 +241,26 @@ func (rc *Client) Init(g glue.Glue, store kv.Storage) error { return errors.Trace(err) } +func (rc *Client) allocTableIDs(ctx context.Context, tables []*metautil.Table) error { + rc.preallocedTableIDs = tidalloc.New(tables) + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) + err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error { + return rc.preallocedTableIDs.Alloc(meta.NewMeta(txn)) + }) + if err != nil { + return err + } + + log.Info("registering the table IDs", zap.Stringer("ids", rc.preallocedTableIDs)) + for i := range rc.dbPool { + rc.dbPool[i].registerPreallocatedIDs(rc.preallocedTableIDs) + } + if rc.db != nil { + rc.db.registerPreallocatedIDs(rc.preallocedTableIDs) + } + return nil +} + // SetPlacementPolicyMode to policy mode. func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string) { switch strings.ToUpper(withPlacementPolicy) { @@ -724,6 +748,11 @@ func (rc *Client) GoCreateTables( } outCh := make(chan CreatedTable, len(tables)) rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter) + if err := rc.allocTableIDs(ctx, tables); err != nil { + errCh <- err + close(outCh) + return outCh + } var err error diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index c761d53693364..ae62162c3e890 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -11,7 +11,9 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" + prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" @@ -24,7 +26,8 @@ import ( // DB is a TiDB instance, not thread-safe. type DB struct { - se glue.Session + se glue.Session + preallocedIDs *prealloctableid.PreallocIDs } type UniqueTableName struct { @@ -78,6 +81,10 @@ func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error) }, supportPolicy, nil } +func (db *DB) registerPreallocatedIDs(ids *prealloctableid.PreallocIDs) { + db.preallocedIDs = ids +} + // ExecDDL executes the query of a ddl job. func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { var err error @@ -272,6 +279,19 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, return nil } +func (db *DB) tableIDAllocFilter() ddl.AllocTableIDIf { + return func(ti *model.TableInfo) bool { + if db.preallocedIDs == nil { + return true + } + prealloced := db.preallocedIDs.Prealloced(ti.ID) + if prealloced { + log.Info("reusing table ID", zap.Stringer("table", ti.Name)) + } + return !prealloced + } +} + // CreateTables execute a internal CREATE TABLES. func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error { @@ -289,7 +309,7 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, } } } - if err := batchSession.CreateTables(ctx, m); err != nil { + if err := batchSession.CreateTables(ctx, m, db.tableIDAllocFilter()); err != nil { return err } @@ -316,7 +336,7 @@ func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, } } - err := db.se.CreateTable(ctx, table.DB.Name, table.Info) + err := db.se.CreateTable(ctx, table.DB.Name, table.Info, db.tableIDAllocFilter()) if err != nil { log.Error("create table failed", zap.Stringer("db", table.DB.Name), diff --git a/br/pkg/restore/prealloc_table_id/alloc.go b/br/pkg/restore/prealloc_table_id/alloc.go new file mode 100644 index 0000000000000..e6e31299974fa --- /dev/null +++ b/br/pkg/restore/prealloc_table_id/alloc.go @@ -0,0 +1,77 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package prealloctableid + +import ( + "fmt" + "math" + + "github.com/pingcap/tidb/br/pkg/metautil" +) + +// Allocator is the interface needed to allocate table IDs. +type Allocator interface { + GetGlobalID() (int64, error) + AdvanceGlobalIDs(n int) (int64, error) +} + +// PreallocIDs mantains the state of preallocated table IDs. +type PreallocIDs struct { + end int64 + + allocedFrom int64 +} + +// New collects the requirement of prealloc IDs and return a +// not-yet-allocated PreallocIDs. +func New(tables []*metautil.Table) *PreallocIDs { + if len(tables) == 0 { + return &PreallocIDs{ + allocedFrom: math.MaxInt64, + } + } + + max := int64(0) + + for _, t := range tables { + if t.Info.ID > max { + max = t.Info.ID + } + } + return &PreallocIDs{ + end: max + 1, + + allocedFrom: math.MaxInt64, + } +} + +// String implements fmt.Stringer. +func (p *PreallocIDs) String() string { + if p.allocedFrom >= p.end { + return fmt.Sprintf("ID:empty(end=%d)", p.end) + } + return fmt.Sprintf("ID:[%d,%d)", p.allocedFrom, p.end) +} + +// preallocTableIDs peralloc the id for [start, end) +func (p *PreallocIDs) Alloc(m Allocator) error { + currentId, err := m.GetGlobalID() + if err != nil { + return err + } + if currentId > p.end { + return nil + } + + alloced, err := m.AdvanceGlobalIDs(int(p.end - currentId)) + if err != nil { + return err + } + p.allocedFrom = alloced + return nil +} + +// Prealloced checks whether a table ID has been successfully allocated. +func (p *PreallocIDs) Prealloced(tid int64) bool { + return p.allocedFrom <= tid && tid < p.end +} diff --git a/br/pkg/restore/prealloc_table_id/alloc_test.go b/br/pkg/restore/prealloc_table_id/alloc_test.go new file mode 100644 index 0000000000000..95771d2f1f580 --- /dev/null +++ b/br/pkg/restore/prealloc_table_id/alloc_test.go @@ -0,0 +1,85 @@ +// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0. + +package prealloctableid_test + +import ( + "fmt" + "testing" + + "github.com/pingcap/tidb/br/pkg/metautil" + prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" + "github.com/pingcap/tidb/parser/model" + "github.com/stretchr/testify/require" +) + +type testAllocator int64 + +func (t *testAllocator) GetGlobalID() (int64, error) { + return int64(*t), nil +} + +func (t *testAllocator) AdvanceGlobalIDs(n int) (int64, error) { + old := int64(*t) + *t = testAllocator(int64(*t) + int64(n)) + return old, nil +} + +func TestAllocator(t *testing.T) { + type Case struct { + tableIDs []int64 + hasAllocatedTo int64 + successfullyAllocated []int64 + shouldAllocatedTo int64 + } + + cases := []Case{ + { + tableIDs: []int64{1, 2, 5, 6, 7}, + hasAllocatedTo: 6, + successfullyAllocated: []int64{6, 7}, + shouldAllocatedTo: 8, + }, + { + tableIDs: []int64{4, 6, 9, 2}, + hasAllocatedTo: 1, + successfullyAllocated: []int64{2, 4, 6, 9}, + shouldAllocatedTo: 10, + }, + { + tableIDs: []int64{1, 2, 3, 4}, + hasAllocatedTo: 5, + successfullyAllocated: []int64{}, + shouldAllocatedTo: 5, + }, + } + + run := func(t *testing.T, c Case) { + tables := make([]*metautil.Table, 0, len(c.tableIDs)) + for _, id := range c.tableIDs { + tables = append(tables, &metautil.Table{ + Info: &model.TableInfo{ + ID: id, + }, + }) + } + + ids := prealloctableid.New(tables) + allocator := testAllocator(c.hasAllocatedTo) + require.NoError(t, ids.Alloc(&allocator)) + + allocated := make([]int64, 0, len(c.successfullyAllocated)) + for _, t := range c.tableIDs { + if ids.Prealloced(t) { + allocated = append(allocated, t) + } + } + require.ElementsMatch(t, allocated, c.successfullyAllocated) + require.Equal(t, int64(allocator), c.shouldAllocatedTo) + } + + for i, c := range cases { + t.Run(fmt.Sprintf("#%d", i), func(t *testing.T) { + run(t, c) + }) + } +} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 7dcdc1274413a..cf1ce4682a09f 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -550,6 +550,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if len(dbs) == 0 && len(tables) != 0 { return errors.Annotate(berrors.ErrRestoreInvalidBackup, "contain tables but no databases") } + archiveSize := reader.ArchiveSize(ctx, files) g.Record(summary.RestoreDataSize, archiveSize) //restore from tidb will fetch a general Size issue https://github.com/pingcap/tidb/issues/27247 @@ -652,6 +653,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // We make bigger errCh so we won't block on multi-part failed. errCh := make(chan error, 32) + tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") diff --git a/ddl/db_table_test.go b/ddl/db_table_test.go index 730e3bb941848..2771a06ecc291 100644 --- a/ddl/db_table_test.go +++ b/ddl/db_table_test.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "strconv" "strings" "testing" "time" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/terror" @@ -380,6 +382,49 @@ func TestAlterTableWithValidation(t *testing.T) { tk.MustQuery("show warnings").Check(testkit.RowsWithSep("|", "Warning|8200|ALTER TABLE WITHOUT VALIDATION is currently unsupported")) } +func TestCreateTableWithInfo(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.Session().SetValue(sessionctx.QueryString, "skip") + + d := dom.DDL() + require.NotNil(t, d) + info := []*model.TableInfo{{ + ID: 42, + Name: model.NewCIStr("t"), + }} + + require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.OnExistError, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return false + }))) + tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 't'").Check(testkit.Rows("42")) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers) + + var id int64 + err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { + m := meta.NewMeta(txn) + var err error + id, err = m.GenGlobalID() + return err + }) + + require.NoError(t, err) + info = []*model.TableInfo{{ + ID: 42, + Name: model.NewCIStr("tt"), + }} + tk.Session().SetValue(sessionctx.QueryString, "skip") + require.NoError(t, d.BatchCreateTableWithInfo(tk.Session(), model.NewCIStr("test"), info, ddl.OnExistError, ddl.AllocTableIDIf(func(ti *model.TableInfo) bool { + return true + }))) + idGen, ok := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 'tt'").Rows()[0][0].(string) + require.True(t, ok) + idGenNum, err := strconv.ParseInt(idGen, 10, 64) + require.NoError(t, err) + require.Greater(t, idGenNum, id) +} + func TestBatchCreateTable(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) diff --git a/ddl/ddl.go b/ddl/ddl.go index 0fac4e23ec0b9..a3c801945bddb 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -90,6 +90,47 @@ const ( // OnExist specifies what to do when a new object has a name collision. type OnExist uint8 +// AllocTableIDIf specifies whether to retain the old table ID. +// If this returns "false", then we would assume the table ID has been +// allocated before calling `CreateTableWithInfo` family. +type AllocTableIDIf func(*model.TableInfo) bool + +// CreateTableWithInfoConfig is the configuration of `CreateTableWithInfo`. +type CreateTableWithInfoConfig struct { + OnExist OnExist + ShouldAllocTableID AllocTableIDIf +} + +// CreateTableWithInfoConfigurier is the "diff" which can be applied to the +// CreateTableWithInfoConfig, currently implementations are "OnExist" and "AllocTableIDIf". +type CreateTableWithInfoConfigurier interface { + // Apply the change over the config. + Apply(*CreateTableWithInfoConfig) +} + +// GetCreateTableWithInfoConfig applies the series of configurier from default config +// and returns the final config. +func GetCreateTableWithInfoConfig(cs []CreateTableWithInfoConfigurier) CreateTableWithInfoConfig { + config := CreateTableWithInfoConfig{} + for _, c := range cs { + c.Apply(&config) + } + if config.ShouldAllocTableID == nil { + config.ShouldAllocTableID = func(*model.TableInfo) bool { return true } + } + return config +} + +// Apply implements Configurier. +func (o OnExist) Apply(c *CreateTableWithInfoConfig) { + c.OnExist = o +} + +// Apply implements Configurier. +func (a AllocTableIDIf) Apply(c *CreateTableWithInfoConfig) { + c.ShouldAllocTableID = a +} + const ( // OnExistError throws an error on name collision. OnExistError OnExist = iota @@ -155,13 +196,13 @@ type DDL interface { ctx sessionctx.Context, schema model.CIStr, info *model.TableInfo, - onExist OnExist) error + cs ...CreateTableWithInfoConfigurier) error // BatchCreateTableWithInfo is like CreateTableWithInfo, but can handle multiple tables. BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, - onExist OnExist) error + cs ...CreateTableWithInfoConfigurier) error // CreatePlacementPolicyWithInfo creates a placement policy // diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 54f8243b6df8c..092ac77782f63 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2448,9 +2448,11 @@ func (d *ddl) CreateTableWithInfo( ctx sessionctx.Context, dbName model.CIStr, tbInfo *model.TableInfo, - onExist OnExist, + cs ...CreateTableWithInfoConfigurier, ) (err error) { - job, err := d.createTableWithInfoJob(ctx, dbName, tbInfo, onExist, false) + c := GetCreateTableWithInfoConfig(cs) + + job, err := d.createTableWithInfoJob(ctx, dbName, tbInfo, c.OnExist, !c.ShouldAllocTableID(tbInfo)) if err != nil { return err } @@ -2461,7 +2463,7 @@ func (d *ddl) CreateTableWithInfo( err = d.DoDDLJob(ctx, job) if err != nil { // table exists, but if_not_exists flags is true, so we ignore this error. - if onExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { + if c.OnExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { ctx.GetSessionVars().StmtCtx.AppendNote(err) err = nil } @@ -2476,7 +2478,10 @@ func (d *ddl) CreateTableWithInfo( func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, dbName model.CIStr, infos []*model.TableInfo, - onExist OnExist) error { + cs ...CreateTableWithInfoConfigurier, +) error { + c := GetCreateTableWithInfoConfig(cs) + jobs := &model.Job{ BinlogInfo: &model.HistoryInfo{}, } @@ -2491,7 +2496,7 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, for _, info := range infos { if _, ok := duplication[info.Name.L]; ok { err = infoschema.ErrTableExists.FastGenByArgs("can not batch create tables with same name") - if onExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { + if c.OnExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { ctx.GetSessionVars().StmtCtx.AppendNote(err) err = nil } @@ -2515,15 +2520,17 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, } for _, info := range infos { - info.ID, genIDs = genIDs[0], genIDs[1:] + if c.ShouldAllocTableID(info) { + info.ID, genIDs = genIDs[0], genIDs[1:] - if parts := info.GetPartitionInfo(); parts != nil { - for i := range parts.Definitions { - parts.Definitions[i].ID, genIDs = genIDs[0], genIDs[1:] + if parts := info.GetPartitionInfo(); parts != nil { + for i := range parts.Definitions { + parts.Definitions[i].ID, genIDs = genIDs[0], genIDs[1:] + } } } - job, err := d.createTableWithInfoJob(ctx, dbName, info, onExist, true) + job, err := d.createTableWithInfoJob(ctx, dbName, info, c.OnExist, true) if err != nil { return errors.Trace(err) } @@ -2555,7 +2562,7 @@ func (d *ddl) BatchCreateTableWithInfo(ctx sessionctx.Context, err = d.DoDDLJob(ctx, jobs) if err != nil { // table exists, but if_not_exists flags is true, so we ignore this error. - if onExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { + if c.OnExist == OnExistIgnore && infoschema.ErrTableExists.Equal(err) { ctx.GetSessionVars().StmtCtx.AppendNote(err) err = nil } diff --git a/ddl/schematracker/checker.go b/ddl/schematracker/checker.go index a2a5c8f5a4402..d08fbf423b0b5 100644 --- a/ddl/schematracker/checker.go +++ b/ddl/schematracker/checker.go @@ -443,13 +443,13 @@ func (d Checker) CreateSchemaWithInfo(ctx sessionctx.Context, info *model.DBInfo } // CreateTableWithInfo implements the DDL interface. -func (d Checker) CreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info *model.TableInfo, onExist ddl.OnExist) error { +func (d Checker) CreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { //TODO implement me panic("implement me") } // BatchCreateTableWithInfo implements the DDL interface. -func (d Checker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, onExist ddl.OnExist) error { +func (d Checker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { //TODO implement me panic("implement me") } diff --git a/ddl/schematracker/dm_tracker.go b/ddl/schematracker/dm_tracker.go index 1ef078805fe6c..ff12f27c3a770 100644 --- a/ddl/schematracker/dm_tracker.go +++ b/ddl/schematracker/dm_tracker.go @@ -220,8 +220,10 @@ func (d SchemaTracker) CreateTableWithInfo( ctx sessionctx.Context, dbName model.CIStr, info *model.TableInfo, - onExist ddl.OnExist, + cs ...ddl.CreateTableWithInfoConfigurier, ) error { + c := ddl.GetCreateTableWithInfoConfig(cs) + schema := d.SchemaByName(dbName) if schema == nil { return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName) @@ -229,7 +231,7 @@ func (d SchemaTracker) CreateTableWithInfo( oldTable, _ := d.TableByName(dbName, info.Name) if oldTable != nil { - switch onExist { + switch c.OnExist { case ddl.OnExistIgnore: return nil case ddl.OnExistReplace: @@ -1111,9 +1113,9 @@ func (SchemaTracker) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.Alte } // BatchCreateTableWithInfo implements the DDL interface, it will call CreateTableWithInfo for each table. -func (d SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, onExist ddl.OnExist) error { +func (d SchemaTracker) BatchCreateTableWithInfo(ctx sessionctx.Context, schema model.CIStr, info []*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { for _, tableInfo := range info { - if err := d.CreateTableWithInfo(ctx, schema, tableInfo, onExist); err != nil { + if err := d.CreateTableWithInfo(ctx, schema, tableInfo, cs...); err != nil { return err } } diff --git a/executor/brie.go b/executor/brie.go index f26ae56aa32e4..608cfd6336b52 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -515,7 +515,7 @@ func (gs *tidbGlueSession) CreateDatabase(ctx context.Context, schema *model.DBI } // CreateTable implements glue.Session -func (gs *tidbGlueSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { +func (gs *tidbGlueSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error { d := domain.GetDomain(gs.se).DDL() // 512 is defaultCapOfCreateTable. @@ -533,7 +533,7 @@ func (gs *tidbGlueSession) CreateTable(ctx context.Context, dbName model.CIStr, table.Partition = &newPartition } - return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore) + return d.CreateTableWithInfo(gs.se, dbName, table, append(cs, ddl.OnExistIgnore)...) } // CreatePlacementPolicy implements glue.Session diff --git a/meta/meta.go b/meta/meta.go index f132d0597d1da..4a6c23b0e865f 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -166,6 +166,23 @@ func (m *Meta) GenGlobalID() (int64, error) { return newID, err } +// AdvanceGlobalIDs advances the global ID by n. +// return the old global ID. +func (m *Meta) AdvanceGlobalIDs(n int) (int64, error) { + globalIDMutex.Lock() + defer globalIDMutex.Unlock() + + newID, err := m.txn.Inc(mNextGlobalIDKey, int64(n)) + if err != nil { + return 0, err + } + if newID > MaxGlobalID { + return 0, errors.Errorf("global id:%d exceeds the limit:%d", newID, MaxGlobalID) + } + origID := newID - int64(n) + return origID, nil +} + // GenGlobalIDs generates the next n global IDs. func (m *Meta) GenGlobalIDs(n int) ([]int64, error) { globalIDMutex.Lock() From 3ef8352a5754606e511ca89292a50612c289a501 Mon Sep 17 00:00:00 2001 From: Meng Xin Date: Fri, 14 Oct 2022 13:01:51 +0800 Subject: [PATCH 9/9] util: add tracker tree memory use print (#37310) close pingcap/tidb#37309 --- executor/explain.go | 18 +++++++++++++++++- util/memory/tracker.go | 22 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/executor/explain.go b/executor/explain.go index 7699751600b89..93bfd245e03b6 100644 --- a/executor/explain.go +++ b/executor/explain.go @@ -20,6 +20,7 @@ import ( "path/filepath" "runtime" rpprof "runtime/pprof" + "sort" "strconv" "sync" "time" @@ -188,6 +189,20 @@ func (h *memoryDebugModeHandler) genInfo(status string, needProfile bool, heapIn return h.infoField, err } +func (h *memoryDebugModeHandler) getTrackerTreeMemUseLogs() []zap.Field { + trackerMemUseMap := h.memTracker.CountAllChildrenMemUse() + logs := make([]zap.Field, 0, len(trackerMemUseMap)) + keys := make([]string, 0, len(trackerMemUseMap)) + for k := range trackerMemUseMap { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + logs = append(logs, zap.String("TrackerTree "+k, memory.FormatBytes(trackerMemUseMap[k]))) + } + return logs +} + func updateTriggerIntervalByHeapInUse(heapInUse uint64) (time.Duration, int) { const GB uint64 = 1 << 30 if heapInUse < 30*GB { @@ -264,7 +279,8 @@ func (h *memoryDebugModeHandler) run() { for _, t := range ts { logs = append(logs, zap.String("Executor_"+strconv.Itoa(t.Label()), memory.FormatBytes(t.BytesConsumed()))) } - logutil.BgLogger().Warn("Memory Debug Mode, Log all trackers that consumes more than threshold * 20%", logs...) + logutil.BgLogger().Warn("Memory Debug Mode, Log all executors that consumes more than threshold * 20%", logs...) + logutil.BgLogger().Warn("Memory Debug Mode, Log the tracker tree", h.getTrackerTreeMemUseLogs()...) } } } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index c967f47a05509..9a281861416d6 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -712,6 +712,28 @@ func (t *Tracker) setParent(parent *Tracker) { t.parMu.parent = parent } +// CountAllChildrenMemUse return memory used tree for the tracker +func (t *Tracker) CountAllChildrenMemUse() map[string]int64 { + trackerMemUseMap := make(map[string]int64, 1024) + countChildMem(t, "", trackerMemUseMap) + return trackerMemUseMap +} + +func countChildMem(t *Tracker, familyTreeName string, trackerMemUseMap map[string]int64) { + if len(familyTreeName) > 0 { + familyTreeName += " <- " + } + familyTreeName += "[" + strconv.Itoa(t.Label()) + "]" + trackerMemUseMap[familyTreeName] += t.BytesConsumed() + t.mu.Lock() + defer t.mu.Unlock() + for _, sli := range t.mu.children { + for _, tracker := range sli { + countChildMem(tracker, familyTreeName, trackerMemUseMap) + } + } +} + const ( // LabelForSQLText represents the label of the SQL Text LabelForSQLText int = -1