Skip to content

Commit

Permalink
Merge branch 'release-5.0' into release-5.0-c4424c2920b5
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 committed Jul 20, 2021
2 parents 61b83c0 + 0b6c3c9 commit b8c288b
Show file tree
Hide file tree
Showing 28 changed files with 1,074 additions and 59 deletions.
5 changes: 4 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
case *ast.LoadDataStmt:
sc.DupKeyAsWarning = true
sc.BadNullAsWarning = true
sc.TruncateAsWarning = !vars.StrictSQLMode
// With IGNORE or LOCAL, data-interpretation errors become warnings and the load operation continues,
// even if the SQL mode is restrictive. For details: https://dev.mysql.com/doc/refman/8.0/en/load-data.html
// TODO: since TiDB only support the LOCAL by now, so the TruncateAsWarning are always true here.
sc.TruncateAsWarning = true
sc.InLoadDataStmt = true
// return warning instead of error when load data meet no partition for value
sc.IgnoreNoPartition = true
Expand Down
8 changes: 8 additions & 0 deletions executor/grant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,14 @@ func (s *testSuite3) TestMaintainRequire(c *C) {
c.Assert(err, NotNil)
}

func (s *testSuite3) TestMaintainAuthString(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`CREATE USER 'maint_auth_str1'@'%' IDENTIFIED BY 'foo'`)
tk.MustQuery("SELECT authentication_string FROM mysql.user WHERE `Host` = '%' and `User` = 'maint_auth_str1'").Check(testkit.Rows("*F3A2A51A9B0F2BE2468926B4132313728C250DBF"))
tk.MustExec(`ALTER USER 'maint_auth_str1'@'%' REQUIRE SSL`)
tk.MustQuery("SELECT authentication_string FROM mysql.user WHERE `Host` = '%' and `User` = 'maint_auth_str1'").Check(testkit.Rows("*F3A2A51A9B0F2BE2468926B4132313728C250DBF"))
}

func (s *testSuite3) TestGrantOnNonExistTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("create user genius")
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows(
"test 2",
))
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 23)
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 24)
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
Expand Down
10 changes: 10 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ func (s *testSerialSuite1) TestSetVar(c *C) {
tk.MustQuery(`show warnings`).Check(testkit.Rows())
tk.MustExec("set @@tidb_enable_clustered_index = 'int_only'")
tk.MustQuery(`show warnings`).Check(testkit.Rows("Warning 1287 'INT_ONLY' is deprecated and will be removed in a future release. Please use 'ON' or 'OFF' instead"))

// test for tidb_enable_stable_result_mode
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_enable_stable_result_mode = 0`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustExec(`set tidb_enable_stable_result_mode=1`)
tk.MustQuery(`select @@global.tidb_enable_stable_result_mode`).Check(testkit.Rows("0"))
tk.MustQuery(`select @@tidb_enable_stable_result_mode`).Check(testkit.Rows("1"))
}

func (s *testSuite5) TestTruncateIncorrectIntSessionVar(c *C) {
Expand Down
37 changes: 23 additions & 14 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,16 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {

failedUsers := make([]string, 0, len(s.Specs))
for _, spec := range s.Specs {
if spec.User.CurrentUser {
user := e.ctx.GetSessionVars().User
user := e.ctx.GetSessionVars().User
if spec.User.CurrentUser || ((user != nil) && (user.Username == spec.User.Username) && (user.AuthHostname == spec.User.Hostname)) {
spec.User.Username = user.Username
spec.User.Hostname = user.AuthHostname
} else {
checker := privilege.GetPrivilegeManager(e.ctx)
activeRoles := e.ctx.GetSessionVars().ActiveRoles
if checker != nil && !checker.RequestVerification(activeRoles, "", "", "", mysql.SuperPriv) {
return ErrDBaccessDenied.GenWithStackByArgs(spec.User.Username, spec.User.Hostname, "mysql")
}
}

exists, err := userExists(e.ctx, spec.User.Username, spec.User.Hostname)
Expand All @@ -910,22 +916,25 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {
failedUsers = append(failedUsers, user)
continue
}
pwd, ok := spec.EncodedPassword()
if !ok {
return errors.Trace(ErrPasswordFormat)
}

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
stmt, err := exec.ParseWithParams(context.TODO(), `UPDATE %n.%n SET authentication_string=%? WHERE Host=%? and User=%?;`, mysql.SystemDB, mysql.UserTable, pwd, spec.User.Hostname, spec.User.Username)
if err != nil {
return err
}
_, _, err = exec.ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
failedUsers = append(failedUsers, spec.User.String())
if spec.AuthOpt != nil {
pwd, ok := spec.EncodedPassword()
if !ok {
return errors.Trace(ErrPasswordFormat)
}
stmt, err := exec.ParseWithParams(context.TODO(), `UPDATE %n.%n SET authentication_string=%? WHERE Host=%? and User=%?;`, mysql.SystemDB, mysql.UserTable, pwd, spec.User.Hostname, spec.User.Username)
if err != nil {
return err
}
_, _, err = exec.ExecRestrictedStmt(context.TODO(), stmt)
if err != nil {
failedUsers = append(failedUsers, spec.User.String())
}
}

if len(privData) > 0 {
stmt, err = exec.ParseWithParams(context.TODO(), "INSERT INTO %n.%n (Host, User, Priv) VALUES (%?,%?,%?) ON DUPLICATE KEY UPDATE Priv = values(Priv)", mysql.SystemDB, mysql.GlobalPrivTable, spec.User.Hostname, spec.User.Username, string(hack.String(privData)))
stmt, err := exec.ParseWithParams(context.TODO(), "INSERT INTO %n.%n (Host, User, Priv) VALUES (%?,%?,%?) ON DUPLICATE KEY UPDATE Priv = values(Priv)", mysql.SystemDB, mysql.GlobalPrivTable, spec.User.Hostname, spec.User.Username, string(hack.String(privData)))
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -2418,7 +2418,11 @@ func (la *LogicalAggregation) tryToGetMppHashAggs(prop *property.PhysicalPropert
childProp := &property.PhysicalProperty{TaskTp: property.MppTaskType, ExpectedCnt: math.MaxFloat64}
agg := NewPhysicalHashAgg(la, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
agg.SetSchema(la.schema.Clone())
agg.MppRunMode = MppTiDB
if la.HasDistinct() {
agg.MppRunMode = MppScalar
} else {
agg.MppRunMode = MppTiDB
}
hashAggs = append(hashAggs, agg)
}
return
Expand Down
7 changes: 7 additions & 0 deletions planner/core/fragment.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

Expand All @@ -40,6 +41,8 @@ type Fragment struct {
ExchangeSender *PhysicalExchangeSender // data exporter

IsRoot bool

singleton bool // indicates if this is a task running on a single node.
}

type tasksAndFrags struct {
Expand Down Expand Up @@ -121,6 +124,7 @@ func (f *Fragment) init(p PhysicalPlan) error {
}
f.TableScan = x
case *PhysicalExchangeReceiver:
f.singleton = x.children[0].(*PhysicalExchangeSender).ExchangeType == tipb.ExchangeType_PassThrough
f.ExchangeReceivers = append(f.ExchangeReceivers, x)
case *PhysicalUnionAll:
return errors.New("unexpected union all detected")
Expand Down Expand Up @@ -246,6 +250,9 @@ func (e *mppTaskGenerator) generateMPPTasksForFragment(f *Fragment) (tasks []*kv
for _, r := range f.ExchangeReceivers {
childrenTasks = append(childrenTasks, r.Tasks...)
}
if f.singleton {
childrenTasks = childrenTasks[0:1]
}
tasks = e.constructMPPTasksByChildrenTasks(childrenTasks)
}
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool
const (
flagGcSubstitute uint64 = 1 << iota
flagPrunColumns
flagStabilizeResults
flagBuildKeyInfo
flagDecorrelate
flagEliminateAgg
Expand All @@ -64,6 +65,7 @@ const (
var optRuleList = []logicalOptRule{
&gcSubstituter{},
&columnPruner{},
&resultsStabilizer{},
&buildKeySolver{},
&decorrelateSolver{},
&aggregationEliminator{},
Expand Down Expand Up @@ -124,12 +126,21 @@ func CheckTableLock(ctx sessionctx.Context, is infoschema.InfoSchema, vs []visit
return nil
}

func checkStableResultMode(sctx sessionctx.Context) bool {
s := sctx.GetSessionVars()
st := s.StmtCtx
return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt)
}

// DoOptimize optimizes a logical plan to a physical plan.
func DoOptimize(ctx context.Context, sctx sessionctx.Context, flag uint64, logic LogicalPlan) (PhysicalPlan, float64, error) {
// if there is something after flagPrunColumns, do flagPrunColumnsAgain
if flag&flagPrunColumns > 0 && flag-flagPrunColumns > flagPrunColumns {
flag |= flagPrunColumnsAgain
}
if checkStableResultMode(sctx) {
flag |= flagStabilizeResults
}
logic, err := logicalOptimize(ctx, flag, logic)
if err != nil {
return nil, 0, err
Expand Down
2 changes: 2 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,8 @@ const (
Mpp2Phase
// MppTiDB runs agg on TiDB (and a partial agg on TiFlash if in 2 phase agg)
MppTiDB
// MppScalar also has 2 phases. The second phase runs in a single task.
MppScalar
)

type basePhysicalAgg struct {
Expand Down
5 changes: 5 additions & 0 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,11 @@ type PointPlanVal struct {

// TryFastPlan tries to use the PointGetPlan for the query.
func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
if checkStableResultMode(ctx) {
// the rule of stabilizing results has not taken effect yet, so cannot generate a plan here in this mode
return nil
}

ctx.GetSessionVars().PlanID = 0
ctx.GetSessionVars().PlanColumnID = 0
switch x := node.(type) {
Expand Down
4 changes: 2 additions & 2 deletions planner/core/rule_eliminate_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ func canProjectionBeEliminatedStrict(p *PhysicalProjection) bool {
// the align the output schema. In the future, we can solve this in-compatibility by
// passing down the aggregation mode to TiFlash.
if physicalAgg, ok := p.Children()[0].(*PhysicalHashAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
return false
}
}
}
if physicalAgg, ok := p.Children()[0].(*PhysicalStreamAgg); ok {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase {
if physicalAgg.MppRunMode == Mpp1Phase || physicalAgg.MppRunMode == Mpp2Phase || physicalAgg.MppRunMode == MppScalar {
if physicalAgg.isFinalAgg() {
return false
}
Expand Down
120 changes: 120 additions & 0 deletions planner/core/rule_stabilize_results.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"context"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/planner/util"
)

/*
resultsStabilizer stabilizes query results.
NOTE: it's not a common rule for all queries, it's specially implemented for a few customers.
Results of some queries are not stable, for example:
create table t (a int); insert into t values (1), (2); select a from t;
In the case above, the result can be `1 2` or `2 1`, which is not stable.
This rule stabilizes results by modifying or injecting a Sort operator:
1. iterate the plan from the root, and ignore all input-order operators (Sel/Proj/Limit);
2. when meeting the first non-input-order operator,
2.1. if it's a Sort, update it by appending all output columns into its order-by list,
2.2. otherwise, inject a new Sort upon this operator.
*/
type resultsStabilizer struct {
}

func (rs *resultsStabilizer) optimize(ctx context.Context, lp LogicalPlan) (LogicalPlan, error) {
stable := rs.completeSort(lp)
if !stable {
lp = rs.injectSort(lp)
}
return lp, nil
}

func (rs *resultsStabilizer) completeSort(lp LogicalPlan) bool {
if rs.isInputOrderKeeper(lp) {
return rs.completeSort(lp.Children()[0])
} else if sort, ok := lp.(*LogicalSort); ok {
cols := sort.Schema().Columns // sort results by all output columns
if handleCol := rs.extractHandleCol(sort.Children()[0]); handleCol != nil {
cols = []*expression.Column{handleCol} // sort results by the handle column if we can get it
}
for _, col := range cols {
exist := false
for _, byItem := range sort.ByItems {
if col.Equal(nil, byItem.Expr) {
exist = true
break
}
}
if !exist {
sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: col})
}
}
return true
}
return false
}

func (rs *resultsStabilizer) injectSort(lp LogicalPlan) LogicalPlan {
if rs.isInputOrderKeeper(lp) {
lp.SetChildren(rs.injectSort(lp.Children()[0]))
return lp
}

byItems := make([]*util.ByItems, 0, len(lp.Schema().Columns))
cols := lp.Schema().Columns
if handleCol := rs.extractHandleCol(lp); handleCol != nil {
cols = []*expression.Column{handleCol}
}
for _, col := range cols {
byItems = append(byItems, &util.ByItems{Expr: col})
}
sort := LogicalSort{
ByItems: byItems,
}.Init(lp.SCtx(), lp.SelectBlockOffset())
sort.SetChildren(lp)
return sort
}

func (rs *resultsStabilizer) isInputOrderKeeper(lp LogicalPlan) bool {
switch lp.(type) {
case *LogicalSelection, *LogicalProjection, *LogicalLimit:
return true
}
return false
}

// extractHandleCols does the best effort to get the handle column.
func (rs *resultsStabilizer) extractHandleCol(lp LogicalPlan) *expression.Column {
switch x := lp.(type) {
case *LogicalSelection, *LogicalLimit:
handleCol := rs.extractHandleCol(lp.Children()[0])
if x.Schema().Contains(handleCol) {
// some Projection Operator might be inlined, so check the column again here
return handleCol
}
case *DataSource:
handleCol := x.getPKIsHandleCol()
if handleCol != nil {
return handleCol
}
}
return nil
}

func (rs *resultsStabilizer) name() string {
return "stabilize_results"
}
Loading

0 comments on commit b8c288b

Please sign in to comment.