Skip to content

Commit

Permalink
planner: Log warnings when agg function can not be pushdown in explai…
Browse files Browse the repository at this point in the history
…n statement (#25553) (#25736)
  • Loading branch information
ti-srebot authored Jul 20, 2021
1 parent 0b6c3c9 commit 0192259
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 20 deletions.
10 changes: 10 additions & 0 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ func (s *testSuite5) TestShowWarningsForExprPushdown(c *C) {
tk.MustExec("explain select * from show_warnings_expr_pushdown where date_add(value, interval 1 day) = '2020-01-01'")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(1))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv"))
tk.MustExec("explain select max(date_add(value, interval 1 day)) from show_warnings_expr_pushdown group by a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(2))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv", "Warning|1105|Aggregation can not be pushed to tikv because arguments of AggFunc `max` contains unsupported exprs"))
tk.MustExec("explain select max(a) from show_warnings_expr_pushdown group by date_add(value, interval 1 day)")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(2))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv", "Warning|1105|Aggregation can not be pushed to tikv because groupByItems contain unsupported exprs"))
tk.MustExec("set tidb_opt_distinct_agg_push_down=0")
tk.MustExec("explain select max(distinct a) from show_warnings_expr_pushdown group by value")
c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(1))
tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct"))
}

func (s *testSuite5) TestShowGrantsPrivilege(c *C) {
Expand Down
17 changes: 14 additions & 3 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"sort"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -2349,17 +2350,24 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P

// TODO: support more operators and distinct later
func (la *LogicalAggregation) checkCanPushDownToMPP() bool {
hasUnsupportedDistinct := false
for _, agg := range la.AggFuncs {
// MPP does not support distinct except count distinct now
if agg.HasDistinct {
if agg.Name != ast.AggFuncCount {
return false
hasUnsupportedDistinct = true
}
}
// MPP does not support AggFuncApproxCountDistinct now
if agg.Name == ast.AggFuncApproxCountDistinct {
return false
hasUnsupportedDistinct = true
}
}
if hasUnsupportedDistinct {
if la.ctx.GetSessionVars().StmtCtx.InExplainStmt {
la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in mpp mode because it contains agg function with distinct"))
}
return false
}
return CheckAggCanPushCop(la.ctx, la.AggFuncs, la.GroupByItems, kv.TiFlash)
}
Expand Down Expand Up @@ -2441,10 +2449,13 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType)
}
canPushDownToTiFlash := la.canPushToCop(kv.TiFlash)
canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash
canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP()
if la.HasDistinct() {
// TODO: remove after the cost estimation of distinct pushdown is implemented.
if !la.ctx.GetSessionVars().AllowDistinctAggPushDown {
if la.ctx.GetSessionVars().StmtCtx.InExplainStmt {
la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct"))
}
taskTypes = []property.TaskType{property.RootTaskType}
}
} else if !la.aggHints.preferAggToCop {
Expand Down
50 changes: 33 additions & 17 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,33 +1358,49 @@ func (sel *PhysicalSelection) attach2Task(tasks ...task) task {
func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression, storeType kv.StoreType) bool {
sc := sctx.GetSessionVars().StmtCtx
client := sctx.GetClient()
ret := true
reason := ""
for _, aggFunc := range aggFuncs {
// if the aggFunc contain VirtualColumn or CorrelatedColumn, it can not be pushed down.
if expression.ContainVirtualColumn(aggFunc.Args) || expression.ContainCorrelatedColumn(aggFunc.Args) {
return false
}
pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc)
if pb == nil {
return false
reason = "expressions of AggFunc `" + aggFunc.Name + "` contain virtual column or correlated column, which is not supported now"
ret = false
break
}
if !aggregation.CheckAggPushDown(aggFunc, storeType) {
if sc.InExplainStmt {
storageName := storeType.Name()
if storeType == kv.UnSpecified {
storageName = "storage layer"
}
sc.AppendWarning(errors.New("Agg function '" + aggFunc.Name + "' can not be pushed to " + storageName))
}
return false
reason = "AggFunc `" + aggFunc.Name + "` is not supported now"
ret = false
break
}
if !expression.CanExprsPushDown(sc, aggFunc.Args, client, storeType) {
return false
reason = "arguments of AggFunc `" + aggFunc.Name + "` contains unsupported exprs"
ret = false
break
}
pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc)
if pb == nil {
reason = "AggFunc `" + aggFunc.Name + "` can not be converted to pb expr"
ret = false
break
}
}
if expression.ContainVirtualColumn(groupByItems) {
return false
if ret && expression.ContainVirtualColumn(groupByItems) {
reason = "groupByItems contain virtual columns, which is not supported now"
ret = false
}
if ret && !expression.CanExprsPushDown(sc, groupByItems, client, storeType) {
reason = "groupByItems contain unsupported exprs"
ret = false
}

if !ret && sc.InExplainStmt {
storageName := storeType.Name()
if storeType == kv.UnSpecified {
storageName = "storage layer"
}
sc.AppendWarning(errors.New("Aggregation can not be pushed to " + storageName + " because " + reason))
}
return expression.CanExprsPushDown(sc, groupByItems, client, storeType)
return ret
}

// AggInfo stores the information of an Aggregation.
Expand Down

0 comments on commit 0192259

Please sign in to comment.