Skip to content

Commit

Permalink
Reduce usages of old horizon planning fallback (#13595)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshit-gangal authored Jul 25, 2023
1 parent c9e53d1 commit e379bc3
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 48 deletions.
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) er
// we are not going to see values multiple times, so we don't need to multiply with the count(*) from the other side
return ab.handlePushThroughAggregation(ctx, aggr)
default:
return errHorizonNotPlanned()
return vterrors.VT12001(fmt.Sprintf("aggregation not planned: %s", aggr.OpCode.String()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (a *Aggregator) addIfAggregationColumn(ctx *plancontext.PlanningContext, co
if _, srcIsAlsoAggr := a.Source.(*Aggregator); srcIsAlsoAggr {
return 0, vterrors.VT12001("aggregation on top of aggregation not supported")
}
return -1, vterrors.VT13001(fmt.Sprintf("aggregation column on wrong index: want: %d, got: %d", colIdx, offset))
return -1, vterrors.VT12001(fmt.Sprintf("failed to plan aggregation on: %s", sqlparser.String(aggr.Original)))
}

a.Source = newSrc
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func createProjectionWithoutAggr(qp *QueryProjection, src ops.Operator) (*Projec
aggr, ok := expr.(sqlparser.AggrFunc)
if !ok {
// need to add logic to extract aggregations and pushed them to the top level
return nil, errHorizonNotPlanned()
return nil, vterrors.VT12001(fmt.Sprintf("unsupported aggregation expression: %s", sqlparser.String(expr)))
}
expr = aggr.GetArg()
if expr == nil {
Expand Down
11 changes: 1 addition & 10 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,7 @@ func optimizeHorizonPlanning(ctx *plancontext.PlanningContext, root ops.Operator
}
}

newOp, err := rewrite.FixedPointBottomUp(root, TableID, visitor, stopAtRoute)
if err != nil {
if vterr, ok := err.(*vterrors.VitessError); ok && vterr.ID == "VT13001" {
// we encountered a bug. let's try to back out
return nil, errHorizonNotPlanned()
}
return nil, err
}

return newOp, nil
return rewrite.FixedPointBottomUp(root, TableID, visitor, stopAtRoute)
}

func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (ops.Operator, *rewrite.ApplyResult, error) {
Expand Down
11 changes: 1 addition & 10 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,7 @@ func planOffsets(ctx *plancontext.PlanningContext, root ops.Operator) (ops.Opera
return in, rewrite.SameTree, nil
}

op, err := rewrite.TopDown(root, TableID, visitor, stopAtRoute)
if err != nil {
if vterr, ok := err.(*vterrors.VitessError); ok && vterr.ID == "VT13001" {
// we encountered a bug. let's try to back out
return nil, errHorizonNotPlanned()
}
return nil, err
}

return op, nil
return rewrite.TopDown(root, TableID, visitor, stopAtRoute)
}

func fetchByOffset(e sqlparser.SQLNode) bool {
Expand Down
3 changes: 0 additions & 3 deletions go/vt/vtgate/planbuilder/operators/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ func createSimpleProjection(ctx *plancontext.PlanningContext, qp *QueryProjectio
}

for _, e := range qp.SelectExprs {
if _, isStar := e.Col.(*sqlparser.StarExpr); isStar {
return nil, errHorizonNotPlanned()
}
ae, err := e.GetAliasedExpr()
if err != nil {
return nil, err
Expand Down
25 changes: 13 additions & 12 deletions go/vt/vtgate/planbuilder/operators/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,21 +543,22 @@ func createProjection(src ops.Operator) (*Projection, error) {
return proj, nil
}

func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, _, addToGroupBy bool) (ops.Operator, int, error) {
func (r *Route) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, reuseExisting, addToGroupBy bool) (ops.Operator, int, error) {
removeKeyspaceFromSelectExpr(expr)

// check if columns is already added.
cols, err := r.GetColumns()
if err != nil {
return nil, 0, err
}
colAsExpr := func(e *sqlparser.AliasedExpr) sqlparser.Expr {
return e.Expr
}
if offset, found := canReuseColumn(ctx, cols, expr.Expr, colAsExpr); found {
return r, offset, nil
if reuseExisting {
// check if columns is already added.
cols, err := r.GetColumns()
if err != nil {
return nil, 0, err
}
colAsExpr := func(e *sqlparser.AliasedExpr) sqlparser.Expr {
return e.Expr
}
if offset, found := canReuseColumn(ctx, cols, expr.Expr, colAsExpr); found {
return r, offset, nil
}
}

// if column is not already present, we check if we can easily find a projection
// or aggregation in our source that we can add to
if ok, offset := addColumnToInput(r.Source, expr, addToGroupBy); ok {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/planbuilder/operators/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package operators

import (
"fmt"

"vitess.io/vitess/go/slices2"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -104,7 +106,7 @@ func (to *Table) TablesUsed() []string {
func addColumn(ctx *plancontext.PlanningContext, op ColNameColumns, e sqlparser.Expr) (int, error) {
col, ok := e.(*sqlparser.ColName)
if !ok {
return 0, vterrors.VT13001("cannot push this expression to a table/vindex: %s", sqlparser.String(e))
return 0, vterrors.VT12001(fmt.Sprintf("cannot add '%s' expression to a table/vindex", sqlparser.String(e)))
}
sqlparser.RemoveKeyspaceFromColName(col)
cols := op.GetColNames()
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -5877,5 +5877,40 @@
"user.user"
]
}
},
{
"comment": "aggregation on top of derived table with limit",
"query": "select count(val2), sum(val2) from (select id, val2 from user where val2 is null limit 2) as x",
"plan": {
"QueryType": "SELECT",
"Original": "select count(val2), sum(val2) from (select id, val2 from user where val2 is null limit 2) as x",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "count(0) AS count(val2), sum(1) AS sum(val2)",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "INT64(2)",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select val2, val2 from (select id, val2 from `user` where 1 != 1) as x where 1 != 1",
"Query": "select val2, val2 from (select id, val2 from `user` where val2 is null) as x limit :__upper_limit",
"Table": "`user`"
}
]
}
]
},
"TablesUsed": [
"user.user"
]
}
}
]
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/tpch_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -746,12 +746,12 @@
{
"comment": "TPC-H query 8",
"query": "select o_year, sum(case when nation = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share from ( select extract(year from o_orderdate) as o_year, l_extendedprice * (1 - l_discount) as volume, n2.n_name as nation from part, supplier, lineitem, orders, customer, nation n1, nation n2, region where p_partkey = l_partkey and s_suppkey = l_suppkey and l_orderkey = o_orderkey and o_custkey = c_custkey and c_nationkey = n1.n_nationkey and n1.n_regionkey = r_regionkey and r_name = 'AMERICA' and s_nationkey = n2.n_nationkey and o_orderdate between date '1995-01-01' and date('1996-12-31') and p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations group by o_year order by o_year",
"plan": "VT12001: unsupported: in scatter query: complex aggregate expression"
"plan": "VT12001: unsupported: failed to plan aggregation on: sum(case when nation = 'BRAZIL' then volume else 0 end)"
},
{
"comment": "TPC-H query 9",
"query": "select nation, o_year, sum(amount) as sum_profit from ( select n_name as nation, extract(year from o_orderdate) as o_year, l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount from part, supplier, lineitem, partsupp, orders, nation where s_suppkey = l_suppkey and ps_suppkey = l_suppkey and ps_partkey = l_partkey and p_partkey = l_partkey and o_orderkey = l_orderkey and s_nationkey = n_nationkey and p_name like '%green%' ) as profit group by nation, o_year order by nation, o_year desc",
"plan": "VT12001: unsupported: aggregation on columns from different sources"
"plan": "VT12001: unsupported: failed to plan aggregation on: sum(amount) as sum_profit"
},
{
"comment": "TPC-H query 10",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/unsupported_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@
{
"comment": "select func(keyspace_id) from user_index where id = :id",
"query": "select func(keyspace_id) from user_index where id = :id",
"plan": "VT12001: unsupported: expression on results of a vindex function"
"plan": "VT12001: unsupported: cannot add 'func(keyspace_id)' expression to a table/vindex"
},
{
"comment": "delete with multi-table targets",
Expand Down
13 changes: 7 additions & 6 deletions go/vt/vtgate/semantics/table_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package semantics

import (
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -106,12 +107,12 @@ func (tc *tableCollector) up(cursor *sqlparser.Cursor) error {

func newVindexTable(t sqlparser.IdentifierCS) *vindexes.Table {
vindexCols := []vindexes.Column{
{Name: sqlparser.NewIdentifierCI("id")},
{Name: sqlparser.NewIdentifierCI("keyspace_id")},
{Name: sqlparser.NewIdentifierCI("range_start")},
{Name: sqlparser.NewIdentifierCI("range_end")},
{Name: sqlparser.NewIdentifierCI("hex_keyspace_id")},
{Name: sqlparser.NewIdentifierCI("shard")},
{Name: sqlparser.NewIdentifierCI("id"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("keyspace_id"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("range_start"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("range_end"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("hex_keyspace_id"), Type: querypb.Type_VARBINARY},
{Name: sqlparser.NewIdentifierCI("shard"), Type: querypb.Type_VARBINARY},
}

return &vindexes.Table{
Expand Down

0 comments on commit e379bc3

Please sign in to comment.