diff --git a/executor/executor.go b/executor/executor.go index 81b59556c6a30..4b8bae365586f 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1662,7 +1662,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { case *ast.LoadDataStmt: sc.DupKeyAsWarning = true sc.BadNullAsWarning = true - sc.TruncateAsWarning = !vars.StrictSQLMode + // With IGNORE or LOCAL, data-interpretation errors become warnings and the load operation continues, + // even if the SQL mode is restrictive. For details: https://dev.mysql.com/doc/refman/8.0/en/load-data.html + // TODO: since TiDB only support the LOCAL by now, so the TruncateAsWarning are always true here. + sc.TruncateAsWarning = true sc.InLoadDataStmt = true // return warning instead of error when load data meet no partition for value sc.IgnoreNoPartition = true diff --git a/executor/grant_test.go b/executor/grant_test.go index e264bb56fa006..f86797a20abc6 100644 --- a/executor/grant_test.go +++ b/executor/grant_test.go @@ -362,6 +362,14 @@ func (s *testSuite3) TestMaintainRequire(c *C) { c.Assert(err, NotNil) } +func (s *testSuite3) TestMaintainAuthString(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec(`CREATE USER 'maint_auth_str1'@'%' IDENTIFIED BY 'foo'`) + tk.MustQuery("SELECT authentication_string FROM mysql.user WHERE `Host` = '%' and `User` = 'maint_auth_str1'").Check(testkit.Rows("*F3A2A51A9B0F2BE2468926B4132313728C250DBF")) + tk.MustExec(`ALTER USER 'maint_auth_str1'@'%' REQUIRE SSL`) + tk.MustQuery("SELECT authentication_string FROM mysql.user WHERE `Host` = '%' and `User` = 'maint_auth_str1'").Check(testkit.Rows("*F3A2A51A9B0F2BE2468926B4132313728C250DBF")) +} + func (s *testSuite3) TestGrantOnNonExistTable(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("create user genius") diff --git a/executor/infoschema_reader_test.go b/executor/infoschema_reader_test.go index 41bad78197f6e..56cf7c2cd0c44 100644 --- a/executor/infoschema_reader_test.go +++ b/executor/infoschema_reader_test.go @@ -847,7 +847,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) { tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows( "test 2", )) - c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 23) + c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 24) } func (s *testInfoschemaTableSuite) TestSequences(c *C) { diff --git a/executor/set_test.go b/executor/set_test.go index bef377b0d7a09..ceb8a8e91eed8 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -547,6 +547,16 @@ func (s *testSerialSuite1) TestSetVar(c *C) { tk.MustQuery(`show warnings`).Check(testkit.Rows()) tk.MustExec("set @@tidb_enable_clustered_index = 'int_only'") tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1287 'INT_ONLY' is deprecated and will be removed in a future release. Please use 'ON' or 'OFF' instead")) + + // test for tidb_enable_stable_result_mode + tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("0")) + tk.MustExec(`set global tidb_enable_stable_result_mode = 1`) + tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("1")) + tk.MustExec(`set global tidb_enable_stable_result_mode = 0`) + tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0")) + tk.MustExec(`set tidb_enable_stable_result_mode=1`) + tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0")) + tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("1")) } func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) { diff --git a/executor/simple.go b/executor/simple.go index f0d3135d21e6e..ccc5f5b2ce242 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -895,10 +895,16 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { failedUsers := make([]string, 0, len(s.Specs)) for _, spec := range s.Specs { - if spec.User.CurrentUser { - user := e.ctx.GetSessionVars().User + user := e.ctx.GetSessionVars().User + if spec.User.CurrentUser || ((user != nil) && (user.Username == spec.User.Username) && (user.AuthHostname == spec.User.Hostname)) { spec.User.Username = user.Username spec.User.Hostname = user.AuthHostname + } else { + checker := privilege.GetPrivilegeManager(e.ctx) + activeRoles := e.ctx.GetSessionVars().ActiveRoles + if checker != nil && !checker.RequestVerification(activeRoles, "", "", "", mysql.SuperPriv) { + return ErrDBaccessDenied.GenWithStackByArgs(spec.User.Username, spec.User.Hostname, "mysql") + } } exists, err := userExists(e.ctx, spec.User.Username, spec.User.Hostname) @@ -910,22 +916,25 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { failedUsers = append(failedUsers, user) continue } - pwd, ok := spec.EncodedPassword() - if !ok { - return errors.Trace(ErrPasswordFormat) - } + exec := e.ctx.(sqlexec.RestrictedSQLExecutor) - stmt, err := exec.ParseWithParams(context.TODO(), `UPDATE %n.%n SET authentication_string=%? WHERE Host=%? and User=%?;`, mysql.SystemDB, mysql.UserTable, pwd, spec.User.Hostname, spec.User.Username) - if err != nil { - return err - } - _, _, err = exec.ExecRestrictedStmt(context.TODO(), stmt) - if err != nil { - failedUsers = append(failedUsers, spec.User.String()) + if spec.AuthOpt != nil { + pwd, ok := spec.EncodedPassword() + if !ok { + return errors.Trace(ErrPasswordFormat) + } + stmt, err := exec.ParseWithParams(context.TODO(), `UPDATE %n.%n SET authentication_string=%? WHERE Host=%? and User=%?;`, mysql.SystemDB, mysql.UserTable, pwd, spec.User.Hostname, spec.User.Username) + if err != nil { + return err + } + _, _, err = exec.ExecRestrictedStmt(context.TODO(), stmt) + if err != nil { + failedUsers = append(failedUsers, spec.User.String()) + } } if len(privData) > 0 { - stmt, err = exec.ParseWithParams(context.TODO(), "INSERT INTO %n.%n (Host, User, Priv) VALUES (%?,%?,%?) ON DUPLICATE KEY UPDATE Priv = values(Priv)", mysql.SystemDB, mysql.GlobalPrivTable, spec.User.Hostname, spec.User.Username, string(hack.String(privData))) + stmt, err := exec.ParseWithParams(context.TODO(), "INSERT INTO %n.%n (Host, User, Priv) VALUES (%?,%?,%?) ON DUPLICATE KEY UPDATE Priv = values(Priv)", mysql.SystemDB, mysql.GlobalPrivTable, spec.User.Hostname, spec.User.Username, string(hack.String(privData))) if err != nil { return err } diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index dedf980ee063f..3e955c5848ec3 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -2418,7 +2418,11 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64} agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp) agg.SetSchema(la.schema.Clone()) - agg.MppRunMode = MppTiDB + if la.HasDistinct() { + agg.MppRunMode = MppScalar + } else { + agg.MppRunMode = MppTiDB + } hashAggs = append(hashAggs, agg) } return diff --git a/planner/core/fragment.go b/planner/core/fragment.go index f329374d853f5..3f465cfabdc02 100644 --- a/planner/core/fragment.go +++ b/planner/core/fragment.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" ) @@ -40,6 +41,8 @@ type Fragment struct { ExchangeSender *PhysicalExchangeSender // data exporter IsRoot bool + + singleton bool // indicates if this is a task running on a single node. } type tasksAndFrags struct { @@ -121,6 +124,7 @@ func (f *Fragment) init(p PhysicalPlan) error { } f.TableScan = x case *PhysicalExchangeReceiver: + f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough f.ExchangeReceivers = append(f.ExchangeReceivers, x) case *PhysicalUnionAll: return errors.New("unexpected union all detected") @@ -246,6 +250,9 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv for _, r := range f.ExchangeReceivers { childrenTasks = append(childrenTasks, r.Tasks...) } + if f.singleton { + childrenTasks = childrenTasks[0:1] + } tasks = e.constructMPPTasksByChildrenTasks(childrenTasks) } if err != nil { diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index c60d8cb7775e4..8c01987ba0255 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -47,6 +47,7 @@ var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool const ( flagGcSubstitute uint64 = 1 << iota flagPrunColumns + flagStabilizeResults flagBuildKeyInfo flagDecorrelate flagEliminateAgg @@ -64,6 +65,7 @@ const ( var optRuleList = []logicalOptRule{ &gcSubstituter{}, &columnPruner{}, + &resultsStabilizer{}, &buildKeySolver{}, &decorrelateSolver{}, &aggregationEliminator{}, @@ -124,12 +126,21 @@ func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visit return nil } +func checkStableResultMode(sctx sessionctx.Context) bool { + s := sctx.GetSessionVars() + st := s.StmtCtx + return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt) +} + // DoOptimize optimizes a logical plan to a physical plan. func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) { // if there is something after flagPrunColumns, do flagPrunColumnsAgain if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns { flag |= flagPrunColumnsAgain } + if checkStableResultMode(sctx) { + flag |= flagStabilizeResults + } logic, err := logicalOptimize(ctx, flag, logic) if err != nil { return nil, 0, err diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 292d9f8606d83..7819fdac20d34 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -1004,6 +1004,8 @@ const ( Mpp2Phase // MppTiDB runs agg on TiDB (and a partial agg on TiFlash if in 2 phase agg) MppTiDB + // MppScalar also has 2 phases. The second phase runs in a single task. + MppScalar ) type basePhysicalAgg struct { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 72248e29a2093..4dc77bb2b8e1a 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -412,6 +412,11 @@ type PointPlanVal struct { // TryFastPlan tries to use the PointGetPlan for the query. func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) { + if checkStableResultMode(ctx) { + // the rule of stabilizing results has not taken effect yet, so cannot generate a plan here in this mode + return nil + } + ctx.GetSessionVars().PlanID = 0 ctx.GetSessionVars().PlanColumnID = 0 switch x := node.(type) { diff --git a/planner/core/rule_eliminate_projection.go b/planner/core/rule_eliminate_projection.go index 5731495f9c2d2..ebc6b23d2b57d 100644 --- a/planner/core/rule_eliminate_projection.go +++ b/planner/core/rule_eliminate_projection.go @@ -43,14 +43,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool { // the align the output schema. In the future, we can solve this in-compatibility by // passing down the aggregation mode to TiFlash. if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok { - if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase { + if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { if physicalAgg.isFinalAgg() { return false } } } if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok { - if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase { + if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar { if physicalAgg.isFinalAgg() { return false } diff --git a/planner/core/rule_stabilize_results.go b/planner/core/rule_stabilize_results.go new file mode 100644 index 0000000000000..87c84bf85b42d --- /dev/null +++ b/planner/core/rule_stabilize_results.go @@ -0,0 +1,120 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "context" + + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/planner/util" +) + +/* + resultsStabilizer stabilizes query results. + NOTE: it's not a common rule for all queries, it's specially implemented for a few customers. + Results of some queries are not stable, for example: + create table t (a int); insert into t values (1), (2); select a from t; + In the case above, the result can be `1 2` or `2 1`, which is not stable. + This rule stabilizes results by modifying or injecting a Sort operator: + 1. iterate the plan from the root, and ignore all input-order operators (Sel/Proj/Limit); + 2. when meeting the first non-input-order operator, + 2.1. if it's a Sort, update it by appending all output columns into its order-by list, + 2.2. otherwise, inject a new Sort upon this operator. +*/ +type resultsStabilizer struct { +} + +func (rs *resultsStabilizer) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) { + stable := rs.completeSort(lp) + if !stable { + lp = rs.injectSort(lp) + } + return lp, nil +} + +func (rs *resultsStabilizer) completeSort(lp LogicalPlan) bool { + if rs.isInputOrderKeeper(lp) { + return rs.completeSort(lp.Children()[0]) + } else if sort, ok := lp.(*LogicalSort); ok { + cols := sort.Schema().Columns // sort results by all output columns + if handleCol := rs.extractHandleCol(sort.Children()[0]); handleCol != nil { + cols = []*expression.Column{handleCol} // sort results by the handle column if we can get it + } + for _, col := range cols { + exist := false + for _, byItem := range sort.ByItems { + if col.Equal(nil, byItem.Expr) { + exist = true + break + } + } + if !exist { + sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: col}) + } + } + return true + } + return false +} + +func (rs *resultsStabilizer) injectSort(lp LogicalPlan) LogicalPlan { + if rs.isInputOrderKeeper(lp) { + lp.SetChildren(rs.injectSort(lp.Children()[0])) + return lp + } + + byItems := make([]*util.ByItems, 0, len(lp.Schema().Columns)) + cols := lp.Schema().Columns + if handleCol := rs.extractHandleCol(lp); handleCol != nil { + cols = []*expression.Column{handleCol} + } + for _, col := range cols { + byItems = append(byItems, &util.ByItems{Expr: col}) + } + sort := LogicalSort{ + ByItems: byItems, + }.Init(lp.SCtx(), lp.SelectBlockOffset()) + sort.SetChildren(lp) + return sort +} + +func (rs *resultsStabilizer) isInputOrderKeeper(lp LogicalPlan) bool { + switch lp.(type) { + case *LogicalSelection, *LogicalProjection, *LogicalLimit: + return true + } + return false +} + +// extractHandleCols does the best effort to get the handle column. +func (rs *resultsStabilizer) extractHandleCol(lp LogicalPlan) *expression.Column { + switch x := lp.(type) { + case *LogicalSelection, *LogicalLimit: + handleCol := rs.extractHandleCol(lp.Children()[0]) + if x.Schema().Contains(handleCol) { + // some Projection Operator might be inlined, so check the column again here + return handleCol + } + case *DataSource: + handleCol := x.getPKIsHandleCol() + if handleCol != nil { + return handleCol + } + } + return nil +} + +func (rs *resultsStabilizer) name() string { + return "stabilize_results" +} diff --git a/planner/core/rule_stabilize_results_test.go b/planner/core/rule_stabilize_results_test.go new file mode 100644 index 0000000000000..78f08812163e9 --- /dev/null +++ b/planner/core/rule_stabilize_results_test.go @@ -0,0 +1,189 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "math" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/kv" + plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/util/kvcache" + "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testutil" +) + +var _ = Suite(&testRuleStabilizeResults{}) +var _ = SerialSuites(&testRuleStabilizeResultsSerial{}) + +type testRuleStabilizeResultsSerial struct { + store kv.Storage + dom *domain.Domain +} + +func (s *testRuleStabilizeResultsSerial) SetUpTest(c *C) { + var err error + s.store, s.dom, err = newStoreWithBootstrap() + c.Assert(err, IsNil) +} + +func (s *testRuleStabilizeResultsSerial) TestPlanCache(c *C) { + tk := testkit.NewTestKit(c, s.store) + orgEnable := plannercore.PreparedPlanCacheEnabled() + defer func() { + plannercore.SetPreparedPlanCache(orgEnable) + }() + plannercore.SetPreparedPlanCache(true) + var err error + tk.Se, err = session.CreateSession4TestWithOpt(s.store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))") + tk.MustExec("prepare s1 from 'select * from t where a > ? limit 10'") + tk.MustExec("set @a = 10") + tk.MustQuery("execute s1 using @a").Check(testkit.Rows()) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustQuery("execute s1 using @a").Check(testkit.Rows()) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) // plan cache is still working +} + +func (s *testRuleStabilizeResultsSerial) TestSQLBinding(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))") + tk.MustQuery("explain select * from t where a > 0 limit 1").Check(testkit.Rows( + "Limit_12 1.00 root offset:0, count:1", + "└─TableReader_22 1.00 root data:Limit_21", + " └─Limit_21 1.00 cop[tikv] offset:0, count:1", + " └─TableRangeScan_20 1.00 cop[tikv] table:t range:(0,+inf], keep order:true, stats:pseudo")) + + tk.MustExec("create session binding for select * from t where a>0 limit 1 using select * from t use index(b) where a>0 limit 1") + tk.MustQuery("explain select * from t where a > 0 limit 1").Check(testkit.Rows( + "TopN_9 1.00 root test.t.a, offset:0, count:1", + "└─IndexLookUp_19 1.00 root ", + " ├─TopN_18(Build) 1.00 cop[tikv] test.t.a, offset:0, count:1", + " │ └─Selection_17 3333.33 cop[tikv] gt(test.t.a, 0)", + " │ └─IndexFullScan_15 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo", + " └─TableRowIDScan_16(Probe) 1.00 cop[tikv] table:t keep order:false, stats:pseudo")) +} + +type testRuleStabilizeResults struct { + store kv.Storage + dom *domain.Domain + + testData testutil.TestData +} + +func (s *testRuleStabilizeResults) SetUpSuite(c *C) { + var err error + s.store, s.dom, err = newStoreWithBootstrap() + c.Assert(err, IsNil) + + s.testData, err = testutil.LoadTestSuiteData("testdata", "stable_result_mode_suite") + c.Assert(err, IsNil) +} + +func (s *testRuleStabilizeResults) TearDownSuite(c *C) { + c.Assert(s.testData.GenerateOutputIfNeeded(), IsNil) +} + +func (s *testRuleStabilizeResults) runTestData(c *C, tk *testkit.TestKit, name string) { + var input []string + var output []struct { + Plan []string + } + s.testData.GetTestCasesByName(name, c, &input, &output) + c.Assert(len(input), Equals, len(output)) + for i := range input { + s.testData.OnRecord(func() { + output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain " + input[i]).Rows()) + }) + tk.MustQuery("explain " + input[i]).Check(testkit.Rows(output[i].Plan...)) + } +} + +func (s *testRuleStabilizeResults) TestStableResultMode(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int primary key, b int, c int, d int, key(b))") + s.runTestData(c, tk, "TestStableResultMode") +} + +func (s *testRuleStabilizeResults) TestStableResultModeOnDML(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int primary key, b int, c int, key(b))") + s.runTestData(c, tk, "TestStableResultModeOnDML") +} + +func (s *testRuleStabilizeResults) TestStableResultModeOnSubQuery(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1 (a int primary key, b int, c int, d int, key(b))") + tk.MustExec("create table t2 (a int primary key, b int, c int, d int, key(b))") + s.runTestData(c, tk, "TestStableResultModeOnSubQuery") +} + +func (s *testRuleStabilizeResults) TestStableResultModeOnJoin(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1 (a int primary key, b int, c int, d int, key(b))") + tk.MustExec("create table t2 (a int primary key, b int, c int, d int, key(b))") + s.runTestData(c, tk, "TestStableResultModeOnJoin") +} + +func (s *testRuleStabilizeResults) TestStableResultModeOnOtherOperators(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1 (a int primary key, b int, c int, d int, unique key(b))") + tk.MustExec("create table t2 (a int primary key, b int, c int, d int, unique key(b))") + s.runTestData(c, tk, "TestStableResultModeOnOtherOperators") +} + +func (s *testRuleStabilizeResults) TestStableResultModeOnPartitionTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_stable_result_mode=1") + tk.MustExec("drop table if exists thash") + tk.MustExec("drop table if exists trange") + tk.MustExec("create table thash (a int primary key, b int, c int, d int) partition by hash(a) partitions 4") + tk.MustExec(`create table trange (a int primary key, b int, c int, d int) partition by range(a) ( + partition p0 values less than (100), + partition p1 values less than (200), + partition p2 values less than (300), + partition p3 values less than (400))`) + s.runTestData(c, tk, "TestStableResultModeOnPartitionTable") +} diff --git a/planner/core/task.go b/planner/core/task.go index a4720e860a7b6..93776a059f8dd 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1886,6 +1886,28 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { attachPlan2Task(finalAgg, t) t.addCost(p.GetCost(inputRows, true, false)) return t + case MppScalar: + proj := p.convertAvgForMPP() + partialAgg, finalAgg := p.newPartialAggregate(kv.TiFlash, true) + if partialAgg == nil || finalAgg == nil { + return invalidTask + } + attachPlan2Task(partialAgg, mpp) + prop := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64, PartitionTp: property.AnyType} + newMpp := mpp.enforceExchangerImpl(prop) + attachPlan2Task(finalAgg, newMpp) + if proj == nil { + proj = PhysicalProjection{ + Exprs: make([]expression.Expression, 0, len(p.Schema().Columns)), + }.Init(p.ctx, p.statsInfo(), p.SelectBlockOffset()) + for _, col := range p.Schema().Columns { + proj.Exprs = append(proj.Exprs, col) + } + proj.SetSchema(p.schema) + } + attachPlan2Task(proj, newMpp) + newMpp.addCost(p.GetCost(inputRows, false, true)) + return newMpp default: return invalidTask } diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index 887a4e9afd390..a63cf787828cd 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -255,7 +255,10 @@ "desc format = 'brief' select * from t join ( select count(distinct value), id from t group by id) as A on A.id = t.id", "desc format = 'brief' select * from t join ( select count(1/value), id from t group by id) as A on A.id = t.id", "desc format = 'brief' select /*+hash_agg()*/ sum(id) from (select value, id from t where id > value group by id, value)A group by value /*the exchange should have only one partition column: test.t.value*/", - "desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/" + "desc format = 'brief' select /*+hash_agg()*/ sum(B.value) from t as B where B.id+1 > (select count(*) from t where t.id= B.id and t.value=B.value) group by B.id /*the exchange should have only one partition column: test.t.id*/", + "desc format = 'brief' select count(distinct value) from t", + "desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t", + "desc format = 'brief' select count(distinct value), count(value), avg(value) from t" ] }, { diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index a00ee8a2dd8cb..905312cacaa38 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -2161,6 +2161,50 @@ " └─ExchangeSender 10000.00 batchCop[tiflash] ExchangeType: HashPartition, Hash Cols: test.t.id", " └─TableFullScan 10000.00 batchCop[tiflash] table:B keep order:false, stats:pseudo" ] + }, + { + "SQL": "desc format = 'brief' select count(distinct value) from t", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 batchCop[tiflash] Column#4", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(distinct x ) from (select count(distinct value) x from t) t", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 batchCop[tiflash] Column#5", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct Column#4)->Column#5", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:Column#4, ", + " └─Projection 1.00 batchCop[tiflash] Column#4", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, ", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(distinct value), count(value), avg(value) from t", + "Plan": [ + "TableReader 1.00 root data:ExchangeSender", + "└─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─Projection 1.00 batchCop[tiflash] Column#4, Column#5, div(Column#6, cast(case(eq(Column#7, 0), 1, Column#7), decimal(20,0) BINARY))->Column#6", + " └─HashAgg 1.00 batchCop[tiflash] funcs:count(distinct test.t.value)->Column#4, funcs:sum(Column#8)->Column#5, funcs:sum(Column#9)->Column#7, funcs:sum(Column#10)->Column#6", + " └─ExchangeReceiver 1.00 batchCop[tiflash] ", + " └─ExchangeSender 1.00 batchCop[tiflash] ExchangeType: PassThrough", + " └─HashAgg 1.00 batchCop[tiflash] group by:test.t.value, funcs:count(test.t.value)->Column#8, funcs:count(test.t.value)->Column#9, funcs:sum(test.t.value)->Column#10", + " └─TableFullScan 10000.00 batchCop[tiflash] table:t keep order:false, stats:pseudo" + ] } ] }, diff --git a/planner/core/testdata/stable_result_mode_suite_in.json b/planner/core/testdata/stable_result_mode_suite_in.json new file mode 100644 index 0000000000000..6ec585e9f594d --- /dev/null +++ b/planner/core/testdata/stable_result_mode_suite_in.json @@ -0,0 +1,81 @@ +[ + { + "name": "TestStableResultMode", + "cases": [ + "select * from t use index(primary)", + "select b from t use index(b)", + "select a, b from t use index(b)", + "select b, c from t use index(b)", + "select b, c from t use index(primary)", + "select min(b), max(c) from t use index(primary) group by d", + "select min(b), max(c) from t use index(primary) group by a", + "select * from t use index(b) limit 10", + "select * from t use index(primary) limit 10", + "select b from t use index(b) order by b", + "select b, c, d from t use index(b) order by b", + "select t1.a, t2.a from t t1, t t2 where t1.a=t2.a", + "select b from t where a>0", + "select b from t where a>0 limit 1" + ] + }, + { + "name": "TestStableResultModeOnDML", + "cases": [ + "insert into t select * from t", + "insert into t select * from t where a>1", + "insert into t select t1.a, t2.b, t1.c+t2.c from t t1, t t2 where t1.a=t2.a", + "insert into t select min(a), max(b), sum(c) from t group by a", + "delete from t", + "delete from t where a>1", + "update t set a=a+1", + "update t set a=a+1 where a>1" + ] + }, + { + "name": "TestStableResultModeOnSubQuery", + "cases": [ + "select * from t1 where t1.a in (select b from t2)", + "select * from t1 where t1.a not in (select b from t2)", + "select * from t1 where t1.a in (select b from t2 where t2.c>t1.c)", + "select * from t1 where t1.a not in (select b from t2 where t2.c>t1.c)", + "select * from t1 where exists (select 1 from t2 where t2.c>t1.c)", + "select * from t1 where not exists (select 1 from t2 where t2.c>t1.c)", + "select * from t1 where exists (select 1 from t2 where t2.c=t1.c)", + "select * from t1 where not exists (select 1 from t2 where t2.c=t1.c)", + "select t1.* from t1, (select b from t2) tb where t1.b=tb.b" + ] + }, + { + "name": "TestStableResultModeOnJoin", + "cases": [ + "select * from t1, t2 where t1.a = t2.a", + "select * from t1, t2 where t1.a > t2.a and t1.b = t2.b and t1.c < t2.c", + "select t1.* from t1 left outer join t2 on t1.a=t2.a", + "select t1.* from t1 join t2 on t1.a!=t2.a" + ] + }, + { + "name": "TestStableResultModeOnOtherOperators", + "cases": [ + "select * from t1 where a = 1 or a = 222 or a = 33333", + "select * from t1 where a in (1, 2, 3, 4)", + "select b from t1 where b = 1 or b = 222 or b = 33333", + "select b from t1 where b in (1, 2, 3, 4)", + "select * from t1 where a > 10 union all select * from t2 where b > 20", + "select * from t1 where a > 10 union distinct select * from t2 where b > 20", + "select row_number() over(partition by a) as row_no, sum(b) over(partition by a) as sum_b from t1", + "select min(a), max(b), sum(c) from t1 group by d", + "select min(a), max(b), sum(c) from t1 group by d having max(b) < 20", + "select case when a=1 then 'a1' when a=2 then 'a2' else 'ax' end from t1 " + ] + }, + { + "name": "TestStableResultModeOnPartitionTable", + "cases": [ + "select * from thash where a in (1, 200)", + "select * from thash where a >= 50 and a <= 150", + "select * from trange where a in (1, 200)", + "select * from trange where a >= 50 and a <= 150" + ] + } +] diff --git a/planner/core/testdata/stable_result_mode_suite_out.json b/planner/core/testdata/stable_result_mode_suite_out.json new file mode 100644 index 0000000000000..469f797d5bf77 --- /dev/null +++ b/planner/core/testdata/stable_result_mode_suite_out.json @@ -0,0 +1,450 @@ +[ + { + "Name": "TestStableResultMode", + "Cases": [ + { + "Plan": [ + "TableReader_10 10000.00 root data:TableFullScan_9", + "└─TableFullScan_9 10000.00 cop[tikv] table:t keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "IndexReader_10 10000.00 root index:IndexFullScan_9", + "└─IndexFullScan_9 10000.00 cop[tikv] table:t, index:b(b) keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_5 10000.00 root test.t.a", + "└─IndexReader_8 10000.00 root index:IndexFullScan_7", + " └─IndexFullScan_7 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_5 10000.00 root test.t.b, test.t.c", + "└─IndexLookUp_9 10000.00 root ", + " ├─IndexFullScan_7(Build) 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo", + " └─TableRowIDScan_8(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_5 10000.00 root test.t.b, test.t.c", + "└─TableReader_8 10000.00 root data:TableFullScan_7", + " └─TableFullScan_7 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_6 8000.00 root Column#5, Column#6", + "└─HashAgg_12 8000.00 root group by:test.t.d, funcs:min(Column#7)->Column#5, funcs:max(Column#8)->Column#6", + " └─TableReader_13 8000.00 root data:HashAgg_8", + " └─HashAgg_8 8000.00 cop[tikv] group by:test.t.d, funcs:min(test.t.b)->Column#7, funcs:max(test.t.c)->Column#8", + " └─TableFullScan_11 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_7 10000.00 root Column#5, Column#6", + "└─Projection_9 10000.00 root cast(test.t.b, int(11))->Column#5, cast(test.t.c, int(11))->Column#6", + " └─TableReader_11 10000.00 root data:TableFullScan_10", + " └─TableFullScan_10 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "TopN_8 10.00 root test.t.a, offset:0, count:10", + "└─IndexLookUp_17 10.00 root ", + " ├─TopN_16(Build) 10.00 cop[tikv] test.t.a, offset:0, count:10", + " │ └─IndexFullScan_14 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo", + " └─TableRowIDScan_15(Probe) 10.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Limit_11 10.00 root offset:0, count:10", + "└─TableReader_21 10.00 root data:Limit_20", + " └─Limit_20 10.00 cop[tikv] offset:0, count:10", + " └─TableFullScan_19 10.00 cop[tikv] table:t keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "IndexReader_11 10000.00 root index:IndexFullScan_10", + "└─IndexFullScan_10 10000.00 cop[tikv] table:t, index:b(b) keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_4 10000.00 root test.t.b, test.t.c, test.t.d", + "└─IndexLookUp_9 10000.00 root ", + " ├─IndexFullScan_7(Build) 10000.00 cop[tikv] table:t, index:b(b) keep order:false, stats:pseudo", + " └─TableRowIDScan_8(Probe) 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_9 12500.00 root test.t.a, test.t.a", + "└─HashJoin_30 12500.00 root inner join, equal:[eq(test.t.a, test.t.a)]", + " ├─IndexReader_43(Build) 10000.00 root index:IndexFullScan_42", + " │ └─IndexFullScan_42 10000.00 cop[tikv] table:t2, index:b(b) keep order:false, stats:pseudo", + " └─IndexReader_39(Probe) 10000.00 root index:IndexFullScan_38", + " └─IndexFullScan_38 10000.00 cop[tikv] table:t1, index:b(b) keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Projection_5 3333.33 root test.t.b", + "└─TableReader_11 3333.33 root data:TableRangeScan_10", + " └─TableRangeScan_10 3333.33 cop[tikv] table:t range:(0,+inf], keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "Projection_7 1.00 root test.t.b", + "└─Limit_12 1.00 root offset:0, count:1", + " └─TableReader_22 1.00 root data:Limit_21", + " └─Limit_21 1.00 cop[tikv] offset:0, count:1", + " └─TableRangeScan_20 1.00 cop[tikv] table:t range:(0,+inf], keep order:true, stats:pseudo" + ] + } + ] + }, + { + "Name": "TestStableResultModeOnDML", + "Cases": [ + { + "Plan": [ + "Insert_1 N/A root N/A", + "└─TableReader_7 10000.00 root data:TableFullScan_6", + " └─TableFullScan_6 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Insert_1 N/A root N/A", + "└─TableReader_8 3333.33 root data:TableRangeScan_7", + " └─TableRangeScan_7 3333.33 cop[tikv] table:t range:(1,+inf], keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Insert_1 N/A root N/A", + "└─Projection_9 12500.00 root test.t.a, test.t.b, plus(test.t.c, test.t.c)->Column#10", + " └─MergeJoin_10 12500.00 root inner join, left key:test.t.a, right key:test.t.a", + " ├─TableReader_34(Build) 10000.00 root data:TableFullScan_33", + " │ └─TableFullScan_33 10000.00 cop[tikv] table:t2 keep order:true, stats:pseudo", + " └─TableReader_32(Probe) 10000.00 root data:TableFullScan_31", + " └─TableFullScan_31 10000.00 cop[tikv] table:t1 keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "Insert_1 N/A root N/A", + "└─Projection_7 10000.00 root cast(test.t.a, int(11))->Column#7, cast(test.t.b, int(11))->Column#8, cast(test.t.c, decimal(32,0) BINARY)->Column#9", + " └─TableReader_9 10000.00 root data:TableFullScan_8", + " └─TableFullScan_8 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Delete_3 N/A root N/A", + "└─TableReader_6 10000.00 root data:TableFullScan_5", + " └─TableFullScan_5 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Delete_4 N/A root N/A", + "└─TableReader_7 3333.33 root data:TableRangeScan_6", + " └─TableRangeScan_6 3333.33 cop[tikv] table:t range:(1,+inf], keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Update_3 N/A root N/A", + "└─TableReader_6 10000.00 root data:TableFullScan_5", + " └─TableFullScan_5 10000.00 cop[tikv] table:t keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Update_4 N/A root N/A", + "└─TableReader_7 3333.33 root data:TableRangeScan_6", + " └─TableRangeScan_6 3333.33 cop[tikv] table:t range:(1,+inf], keep order:false, stats:pseudo" + ] + } + ] + }, + { + "Name": "TestStableResultModeOnSubQuery", + "Cases": [ + { + "Plan": [ + "Sort_11 9990.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_23 9990.00 root inner join, equal:[eq(test.t1.a, test.t2.b)]", + " ├─HashAgg_36(Build) 7992.00 root group by:test.t2.b, funcs:firstrow(test.t2.b)->test.t2.b", + " │ └─IndexReader_43 9990.00 root index:IndexFullScan_42", + " │ └─IndexFullScan_42 9990.00 cop[tikv] table:t2, index:b(b) keep order:false, stats:pseudo", + " └─TableReader_47(Probe) 10000.00 root data:TableFullScan_46", + " └─TableFullScan_46 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_9 8000.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_11 8000.00 root CARTESIAN anti semi join, other cond:eq(test.t1.a, test.t2.b)", + " ├─IndexReader_17(Build) 10000.00 root index:IndexFullScan_16", + " │ └─IndexFullScan_16 10000.00 cop[tikv] table:t2, index:b(b) keep order:false, stats:pseudo", + " └─TableReader_13(Probe) 10000.00 root data:TableFullScan_12", + " └─TableFullScan_12 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_10 7992.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_29 7992.00 root semi join, equal:[eq(test.t1.a, test.t2.b)], other cond:gt(test.t2.c, test.t1.c)", + " ├─TableReader_43(Build) 9980.01 root data:Selection_42", + " │ └─Selection_42 9980.01 cop[tikv] not(isnull(test.t2.b)), not(isnull(test.t2.c))", + " │ └─TableFullScan_41 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_40(Probe) 9990.00 root data:Selection_39", + " └─Selection_39 9990.00 cop[tikv] not(isnull(test.t1.c))", + " └─TableFullScan_38 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_10 8000.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_12 8000.00 root CARTESIAN anti semi join, other cond:eq(test.t1.a, test.t2.b), gt(test.t2.c, test.t1.c)", + " ├─TableReader_16(Build) 10000.00 root data:TableFullScan_15", + " │ └─TableFullScan_15 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_14(Probe) 10000.00 root data:TableFullScan_13", + " └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_10 7992.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_12 7992.00 root CARTESIAN semi join, other cond:gt(test.t2.c, test.t1.c)", + " ├─TableReader_18(Build) 9990.00 root data:Selection_17", + " │ └─Selection_17 9990.00 cop[tikv] not(isnull(test.t2.c))", + " │ └─TableFullScan_16 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_15(Probe) 9990.00 root data:Selection_14", + " └─Selection_14 9990.00 cop[tikv] not(isnull(test.t1.c))", + " └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_10 8000.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_12 8000.00 root CARTESIAN anti semi join, other cond:gt(test.t2.c, test.t1.c)", + " ├─TableReader_16(Build) 10000.00 root data:TableFullScan_15", + " │ └─TableFullScan_15 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_14(Probe) 10000.00 root data:TableFullScan_13", + " └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_10 7992.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_12 7992.00 root semi join, equal:[eq(test.t1.c, test.t2.c)]", + " ├─TableReader_18(Build) 9990.00 root data:Selection_17", + " │ └─Selection_17 9990.00 cop[tikv] not(isnull(test.t2.c))", + " │ └─TableFullScan_16 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_15(Probe) 9990.00 root data:Selection_14", + " └─Selection_14 9990.00 cop[tikv] not(isnull(test.t1.c))", + " └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_10 8000.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_12 8000.00 root anti semi join, equal:[eq(test.t1.c, test.t2.c)]", + " ├─TableReader_16(Build) 10000.00 root data:TableFullScan_15", + " │ └─TableFullScan_15 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_14(Probe) 10000.00 root data:TableFullScan_13", + " └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Projection_9 12487.50 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─Sort_10 12487.50 root test.t1.a, test.t1.b, test.t1.c, test.t1.d, test.t2.b", + " └─HashJoin_37 12487.50 root inner join, equal:[eq(test.t1.b, test.t2.b)]", + " ├─IndexReader_51(Build) 9990.00 root index:IndexFullScan_50", + " │ └─IndexFullScan_50 9990.00 cop[tikv] table:t2, index:b(b) keep order:false, stats:pseudo", + " └─TableReader_46(Probe) 9990.00 root data:Selection_45", + " └─Selection_45 9990.00 cop[tikv] not(isnull(test.t1.b))", + " └─TableFullScan_44 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + } + ] + }, + { + "Name": "TestStableResultModeOnJoin", + "Cases": [ + { + "Plan": [ + "Sort_9 12500.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d, test.t2.a, test.t2.b, test.t2.c, test.t2.d", + "└─MergeJoin_11 12500.00 root inner join, left key:test.t1.a, right key:test.t2.a", + " ├─TableReader_35(Build) 10000.00 root data:TableFullScan_34", + " │ └─TableFullScan_34 10000.00 cop[tikv] table:t2 keep order:true, stats:pseudo", + " └─TableReader_33(Probe) 10000.00 root data:TableFullScan_32", + " └─TableFullScan_32 10000.00 cop[tikv] table:t1 keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_9 12475.01 root test.t1.a, test.t1.b, test.t1.c, test.t1.d, test.t2.a, test.t2.b, test.t2.c, test.t2.d", + "└─HashJoin_42 12475.01 root inner join, equal:[eq(test.t1.b, test.t2.b)], other cond:gt(test.t1.a, test.t2.a), lt(test.t1.c, test.t2.c)", + " ├─TableReader_61(Build) 9980.01 root data:Selection_60", + " │ └─Selection_60 9980.01 cop[tikv] not(isnull(test.t2.b)), not(isnull(test.t2.c))", + " │ └─TableFullScan_59 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo", + " └─TableReader_54(Probe) 9980.01 root data:Selection_53", + " └─Selection_53 9980.01 cop[tikv] not(isnull(test.t1.b)), not(isnull(test.t1.c))", + " └─TableFullScan_52 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_7 12500.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─HashJoin_19 12500.00 root left outer join, equal:[eq(test.t1.a, test.t2.a)]", + " ├─IndexReader_30(Build) 10000.00 root index:IndexFullScan_29", + " │ └─IndexFullScan_29 10000.00 cop[tikv] table:t2, index:b(b) keep order:false, stats:pseudo", + " └─TableReader_26(Probe) 10000.00 root data:TableFullScan_25", + " └─TableFullScan_25 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Projection_8 100000000.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d", + "└─Sort_9 100000000.00 root test.t1.a, test.t1.b, test.t1.c, test.t1.d, test.t2.a", + " └─HashJoin_11 100000000.00 root CARTESIAN inner join, other cond:ne(test.t1.a, test.t2.a)", + " ├─IndexReader_18(Build) 10000.00 root index:IndexFullScan_17", + " │ └─IndexFullScan_17 10000.00 cop[tikv] table:t2, index:b(b) keep order:false, stats:pseudo", + " └─TableReader_14(Probe) 10000.00 root data:TableFullScan_13", + " └─TableFullScan_13 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + } + ] + }, + { + "Name": "TestStableResultModeOnOtherOperators", + "Cases": [ + { + "Plan": [ + "Batch_Point_Get_9 3.00 root table:t1 handle:[1 222 33333], keep order:true, desc:false" + ] + }, + { + "Plan": [ + "Batch_Point_Get_9 4.00 root table:t1 handle:[1 2 3 4], keep order:true, desc:false" + ] + }, + { + "Plan": [ + "Batch_Point_Get_9 3.00 root table:t1, index:b(b) keep order:true, desc:false" + ] + }, + { + "Plan": [ + "Batch_Point_Get_9 4.00 root table:t1, index:b(b) keep order:true, desc:false" + ] + }, + { + "Plan": [ + "Sort_11 6666.67 root Column#9, Column#10, Column#11, Column#12", + "└─Union_13 6666.67 root ", + " ├─TableReader_16 3333.33 root data:TableRangeScan_15", + " │ └─TableRangeScan_15 3333.33 cop[tikv] table:t1 range:(10,+inf], keep order:false, stats:pseudo", + " └─TableReader_20 3333.33 root data:Selection_19", + " └─Selection_19 3333.33 cop[tikv] gt(test.t2.b, 20)", + " └─TableFullScan_18 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_12 5333.33 root Column#9, Column#10, Column#11, Column#12", + "└─HashAgg_14 5333.33 root group by:Column#10, Column#11, Column#12, Column#9, funcs:firstrow(Column#9)->Column#9, funcs:firstrow(Column#10)->Column#10, funcs:firstrow(Column#11)->Column#11, funcs:firstrow(Column#12)->Column#12", + " └─Union_15 6666.67 root ", + " ├─TableReader_18 3333.33 root data:TableRangeScan_17", + " │ └─TableRangeScan_17 3333.33 cop[tikv] table:t1 range:(10,+inf], keep order:false, stats:pseudo", + " └─TableReader_22 3333.33 root data:Selection_21", + " └─Selection_21 3333.33 cop[tikv] gt(test.t2.b, 20)", + " └─TableFullScan_20 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Projection_10 10000.00 root Column#8, Column#7", + "└─Sort_11 10000.00 root test.t1.a, Column#7, Column#8", + " └─Window_13 10000.00 root row_number()->Column#8 over(partition by test.t1.a)", + " └─Window_14 10000.00 root sum(cast(test.t1.b, decimal(32,0) BINARY))->Column#7 over(partition by test.t1.a)", + " └─TableReader_17 10000.00 root data:TableFullScan_16", + " └─TableFullScan_16 10000.00 cop[tikv] table:t1 keep order:true, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_6 8000.00 root Column#5, Column#6, Column#7", + "└─HashAgg_12 8000.00 root group by:test.t1.d, funcs:min(Column#8)->Column#5, funcs:max(Column#9)->Column#6, funcs:sum(Column#10)->Column#7", + " └─TableReader_13 8000.00 root data:HashAgg_8", + " └─HashAgg_8 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#8, funcs:max(test.t1.b)->Column#9, funcs:sum(test.t1.c)->Column#10", + " └─TableFullScan_11 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_9 6400.00 root Column#5, Column#6, Column#7", + "└─Selection_11 6400.00 root lt(Column#6, 20)", + " └─HashAgg_16 8000.00 root group by:test.t1.d, funcs:min(Column#11)->Column#5, funcs:max(Column#12)->Column#6, funcs:sum(Column#13)->Column#7", + " └─TableReader_17 8000.00 root data:HashAgg_12", + " └─HashAgg_12 8000.00 cop[tikv] group by:test.t1.d, funcs:min(test.t1.a)->Column#11, funcs:max(test.t1.b)->Column#12, funcs:sum(test.t1.c)->Column#13", + " └─TableFullScan_15 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Projection_4 10000.00 root case(eq(test.t1.a, 1), a1, eq(test.t1.a, 2), a2, ax)->Column#5", + "└─TableReader_12 10000.00 root data:TableFullScan_11", + " └─TableFullScan_11 10000.00 cop[tikv] table:t1 keep order:true, stats:pseudo" + ] + } + ] + }, + { + "Name": "TestStableResultModeOnPartitionTable", + "Cases": [ + { + "Plan": [ + "Sort_6 2.00 root test.thash.a", + "└─TableReader_9 2.00 root partition:p0,p1 data:TableRangeScan_8", + " └─TableRangeScan_8 2.00 cop[tikv] table:thash range:[1,1], [200,200], keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_6 100.00 root test.thash.a", + "└─TableReader_9 100.00 root partition:all data:TableRangeScan_8", + " └─TableRangeScan_8 100.00 cop[tikv] table:thash range:[50,150], keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_6 2.00 root test.trange.a", + "└─TableReader_9 2.00 root partition:p0,p2 data:TableRangeScan_8", + " └─TableRangeScan_8 2.00 cop[tikv] table:trange range:[1,1], [200,200], keep order:false, stats:pseudo" + ] + }, + { + "Plan": [ + "Sort_6 100.00 root test.trange.a", + "└─TableReader_9 100.00 root partition:p0,p1 data:TableRangeScan_8", + " └─TableRangeScan_8 100.00 cop[tikv] table:trange range:[50,150], keep order:false, stats:pseudo" + ] + } + ] + } +] diff --git a/privilege/privileges/privileges_test.go b/privilege/privileges/privileges_test.go index 0e4500766fa0c..7a8c0e14795d9 100644 --- a/privilege/privileges/privileges_test.go +++ b/privilege/privileges/privileges_test.go @@ -466,6 +466,28 @@ func (s *testPrivilegeSuite) TestSetPasswdStmt(c *C) { c.Assert(err, NotNil) } +func (s *testPrivilegeSuite) TestAlterUserStmt(c *C) { + se := newSession(c, s.store, s.dbName) + + // high privileged user setting password for other user (passes) + mustExec(c, se, "CREATE USER 'superuser2'") + mustExec(c, se, "CREATE USER 'nobodyuser2'") + mustExec(c, se, "CREATE USER 'nobodyuser3'") + mustExec(c, se, "GRANT ALL ON *.* TO 'superuser2'") + mustExec(c, se, "GRANT CREATE USER ON *.* TO 'nobodyuser2'") + + c.Assert(se.Auth(&auth.UserIdentity{Username: "superuser2", Hostname: "localhost", AuthUsername: "superuser2", AuthHostname: "%"}, nil, nil), IsTrue) + mustExec(c, se, "ALTER USER 'nobodyuser2' IDENTIFIED BY 'newpassword'") + mustExec(c, se, "ALTER USER 'nobodyuser2' IDENTIFIED BY ''") + + // low privileged user trying to set password for other user (fails) + c.Assert(se.Auth(&auth.UserIdentity{Username: "nobodyuser2", Hostname: "localhost", AuthUsername: "nobodyuser2", AuthHostname: "%"}, nil, nil), IsTrue) + mustExec(c, se, "ALTER USER 'nobodyuser2' IDENTIFIED BY 'newpassword'") + mustExec(c, se, "ALTER USER 'nobodyuser2' IDENTIFIED BY ''") + _, err := se.ExecuteInternal(context.Background(), "ALTER USER 'superuser2' IDENTIFIED BY 'newpassword'") + c.Assert(err, NotNil) +} + func (s *testPrivilegeSuite) TestSelectViewSecurity(c *C) { se := newSession(c, s.store, s.dbName) ctx, _ := se.(sessionctx.Context) diff --git a/server/http_handler.go b/server/http_handler.go index d2705d721e3c2..b0541f0be37cd 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -44,7 +44,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/session" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -370,11 +369,11 @@ func (t *tikvHandlerTool) getPartition(tableVal table.Table, partitionName strin } func (t *tikvHandlerTool) schema() (infoschema.InfoSchema, error) { - session, err := session.CreateSession(t.Store) + dom, err := session.GetDomain(t.Store) if err != nil { - return nil, errors.Trace(err) + return nil, err } - return domain.GetDomain(session.(sessionctx.Context)).InfoSchema(), nil + return dom.InfoSchema(), nil } func (t *tikvHandlerTool) handleMvccGetByHex(params map[string]string) (*mvccKV, error) { @@ -722,14 +721,13 @@ func (h settingsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } if asyncCommit := req.Form.Get("tidb_enable_async_commit"); asyncCommit != "" { - s, err := session.CreateSession(h.Store.(kv.Storage)) + s, err := session.CreateSession(h.Store) if err != nil { writeError(w, err) return } - if s != nil { - defer s.Close() - } + defer s.Close() + switch asyncCommit { case "0": err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBEnableAsyncCommit, variable.BoolOff) @@ -745,14 +743,13 @@ func (h settingsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } } if onePC := req.Form.Get("tidb_enable_1pc"); onePC != "" { - s, err := session.CreateSession(h.Store.(kv.Storage)) + s, err := session.CreateSession(h.Store) if err != nil { writeError(w, err) return } - if s != nil { - defer s.Close() - } + defer s.Close() + switch onePC { case "0": err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(variable.TiDBEnable1PC, variable.BoolOff) @@ -888,14 +885,11 @@ func (h flashReplicaHandler) getTiFlashReplicaInfo(tblInfo *model.TableInfo, rep } func (h flashReplicaHandler) getDropOrTruncateTableTiflash(currentSchema infoschema.InfoSchema) ([]*tableFlashReplicaInfo, error) { - s, err := session.CreateSession(h.Store.(kv.Storage)) + s, err := session.CreateSession(h.Store) if err != nil { return nil, errors.Trace(err) } - - if s != nil { - defer s.Close() - } + defer s.Close() store := domain.GetDomain(s).Store() txn, err := store.Begin() @@ -958,16 +952,18 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http writeError(w, err) return } - do, err := session.GetDomain(h.Store.(kv.Storage)) + do, err := session.GetDomain(h.Store) if err != nil { writeError(w, err) return } - s, err := session.CreateSession(h.Store.(kv.Storage)) + s, err := session.CreateSession(h.Store) if err != nil { writeError(w, err) return } + defer s.Close() + available := status.checkTableFlashReplicaAvailable() err = do.DDL().UpdateTableReplicaInfo(s, status.ID, available) if err != nil { @@ -1133,18 +1129,7 @@ func (h ddlHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request } func (h ddlHistoryJobHandler) getAllHistoryDDL() ([]*model.Job, error) { - s, err := session.CreateSession(h.Store.(kv.Storage)) - if err != nil { - return nil, errors.Trace(err) - } - - if s != nil { - defer s.Close() - } - - store := domain.GetDomain(s.(sessionctx.Context)).Store() - txn, err := store.Begin() - + txn, err := h.Store.Begin() if err != nil { return nil, errors.Trace(err) } @@ -1737,7 +1722,7 @@ type serverInfo struct { // ServeHTTP handles request of ddl server info. func (h serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - do, err := session.GetDomain(h.Store.(kv.Storage)) + do, err := session.GetDomain(h.Store) if err != nil { writeError(w, errors.New("create session error")) log.Error(err) @@ -1767,7 +1752,7 @@ type clusterServerInfo struct { // ServeHTTP handles request of all ddl servers info. func (h allServerInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - do, err := session.GetDomain(h.Store.(kv.Storage)) + do, err := session.GetDomain(h.Store) if err != nil { writeError(w, errors.New("create session error")) log.Error(err) @@ -1868,6 +1853,8 @@ func (h profileHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { writeError(w, err) return } + defer sctx.Close() + var start, end time.Time if req.FormValue("end") != "" { end, err = time.ParseInLocation(time.RFC3339, req.FormValue("end"), sctx.GetSessionVars().Location()) diff --git a/server/sql_info_fetcher.go b/server/sql_info_fetcher.go index a7be33ea00154..6fc80daf506d6 100644 --- a/server/sql_info_fetcher.go +++ b/server/sql_info_fetcher.go @@ -81,6 +81,7 @@ func (sh *sqlInfoFetcher) zipInfoForSQL(w http.ResponseWriter, r *http.Request) return } defer sh.s.Close() + sh.do = domain.GetDomain(sh.s) reqCtx := r.Context() sql := r.FormValue("sql") diff --git a/server/statistics_handler.go b/server/statistics_handler.go index 733a0559f4943..55e9e4f16df18 100644 --- a/server/statistics_handler.go +++ b/server/statistics_handler.go @@ -92,6 +92,8 @@ func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request writeError(w, err) return } + defer se.Close() + se.GetSessionVars().StmtCtx.TimeZone = time.Local t, err := types.ParseTime(se.GetSessionVars().StmtCtx, params[pSnapshot], mysql.TypeTimestamp, 6) if err != nil { diff --git a/session/bootstrap.go b/session/bootstrap.go index 74b4d53d3055f..561fa5707becd 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -329,6 +329,15 @@ const ( LAST_USED_AT timestamp, PRIMARY KEY(TABLE_ID, INDEX_ID) );` + + // CreateGlobalGrantsTable stores dynamic privs + CreateGlobalGrantsTable = `CREATE TABLE IF NOT EXISTS mysql.global_grants ( + USER char(32) NOT NULL DEFAULT '', + HOST char(255) NOT NULL DEFAULT '', + PRIV char(32) NOT NULL DEFAULT '', + WITH_GRANT_OPTION enum('N','Y') NOT NULL DEFAULT 'N', + PRIMARY KEY (USER,HOST,PRIV) + );` ) // bootstrap initiates system DB for a store. @@ -479,11 +488,15 @@ const ( version67 = 67 // version68 update the global variable 'tidb_enable_clustered_index' from 'off' to 'int_only'. version68 = 68 + // version69 adds mysql.global_grants for DYNAMIC privileges + // and forces tidb_multi_statement_mode=OFF when tidb_multi_statement_mode=WARN + // This affects upgrades from v4.0 where the default was WARN. + version69 = 69 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version68 +var currentBootstrapVersion int64 = version69 var ( bootstrapVersion = []func(Session, int64){ @@ -555,6 +568,7 @@ var ( upgradeToVer66, upgradeToVer67, upgradeToVer68, + upgradeToVer69, } ) @@ -1477,6 +1491,14 @@ func upgradeToVer68(s Session, ver int64) { mustExecute(s, "DELETE FROM mysql.global_variables where VARIABLE_NAME = 'tidb_enable_clustered_index' and VARIABLE_VALUE = 'OFF'") } +func upgradeToVer69(s Session, ver int64) { + if ver >= version69 { + return + } + doReentrantDDL(s, CreateGlobalGrantsTable) + mustExecute(s, "UPDATE mysql.global_variables SET VARIABLE_VALUE='OFF' WHERE VARIABLE_NAME = 'tidb_multi_statement_mode' AND VARIABLE_VALUE = 'WARN'") +} + func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, %?) ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, @@ -1553,6 +1575,8 @@ func doDDLWorks(s Session) { mustExecute(s, CreateSchemaIndexUsageTable) // Create stats_fm_sketch table. mustExecute(s, CreateStatsFMSketchTable) + // Create global_grants + mustExecute(s, CreateGlobalGrantsTable) } // doDMLWorks executes DML statements in bootstrap stage. diff --git a/session/session.go b/session/session.go index 19ac9fcf8e8b2..98c2c93499b22 100644 --- a/session/session.go +++ b/session/session.go @@ -2546,6 +2546,7 @@ var builtinGlobalVariable = []string{ variable.TiDBMultiStatementMode, variable.TiDBEnableExchangePartition, variable.TiDBAllowFallbackToTiKV, + variable.TiDBEnableStableResultMode, } // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1c333a9bab074..7721ce0292ae9 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -833,6 +833,9 @@ type SessionVars struct { // AllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV. // Now we only support TiFlash. AllowFallbackToTiKV map[kv.StoreType]struct{} + + // EnableStableResultMode if stabilize query results. + EnableStableResultMode bool } // AllocMPPTaskID allocates task id for mpp tasks. It will reset the task id if the query's @@ -1791,6 +1794,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.AllowFallbackToTiKV[kv.TiFlash] = struct{}{} } } + case TiDBEnableStableResultMode: + s.EnableStableResultMode = TiDBOptOn(val) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 95c444a911137..3c8affc8c0b20 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -809,6 +809,7 @@ var defaultSysVars = []*SysVar{ {Scope: ScopeGlobal | ScopeSession, Name: TiDBTrackAggregateMemoryUsage, Value: BoolToOnOff(DefTiDBTrackAggregateMemoryUsage), Type: TypeBool}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBMultiStatementMode, Value: Off, Type: TypeEnum, PossibleValues: []string{Off, On, Warn}}, {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableExchangePartition, Value: BoolToOnOff(DefTiDBEnableExchangePartition), Type: TypeBool}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableStableResultMode, Value: BoolToOnOff(DefTiDBEnableStableResultMode), Type: TypeBool}, /* tikv gc metrics */ {Scope: ScopeGlobal, Name: TiDBGCEnable, Value: BoolOn, Type: TypeBool}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 8acc2bb02586e..a642b99238eaa 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -537,6 +537,9 @@ const ( // TiDBAllowFallbackToTiKV indicates the engine types whose unavailability triggers fallback to TiKV. // Now we only support TiFlash. TiDBAllowFallbackToTiKV = "tidb_allow_fallback_to_tikv" + + // TiDBEnableStableResultMode indicates if stabilize query results. + TiDBEnableStableResultMode = "tidb_enable_stable_result_mode" ) // TiDB vars that have only global scope @@ -679,6 +682,7 @@ const ( DefTiDBEnableIndexMergeJoin = false DefTiDBTrackAggregateMemoryUsage = true DefTiDBEnableExchangePartition = false + DefTiDBEnableStableResultMode = false ) // Process global variables. diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 6829a6e6a844f..0d6d905c3a99d 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -221,7 +221,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. // That's a hard job but we can try it in the future. if sender.GetRPCError() != nil { - logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error())) + logutil.BgLogger().Error("mpp dispatch meet io error", zap.String("error", sender.GetRPCError().Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) // we return timeout to trigger tikv's fallback m.sendError(tikv.ErrTiFlashServerTimeout) return @@ -231,7 +231,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *tikv.Backoffer, } if err != nil { - logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error())) + logutil.BgLogger().Error("mpp dispatch meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) // we return timeout to trigger tikv's fallback m.sendError(tikv.ErrTiFlashServerTimeout) return @@ -313,7 +313,7 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR rpcResp, err := m.store.GetTiKVClient().SendRequest(bo.GetCtx(), req.Meta.GetAddress(), wrappedReq, tikv.ReadTimeoutUltraLong) if err != nil { - logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error())) + logutil.BgLogger().Error("establish mpp connection meet error", zap.String("error", err.Error()), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) // we return timeout to trigger tikv's fallback m.sendError(tikv.ErrTiFlashServerTimeout) return @@ -342,9 +342,9 @@ func (m *mppIterator) establishMPPConns(bo *tikv.Backoffer, req *kv.MPPDispatchR if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil { if errors.Cause(err) == context.Canceled { - logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) + logutil.BgLogger().Info("stream recv timeout", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) } else { - logutil.BgLogger().Info("stream unknown error", zap.Error(err)) + logutil.BgLogger().Info("stream unknown error", zap.Error(err), zap.Uint64("timestamp", taskMeta.StartTs), zap.Int64("task", taskMeta.TaskId)) } } m.sendError(tikv.ErrTiFlashServerTimeout)