Skip to content

Commit

Permalink
feat: allow derived tables with explicit column aliases to work in th…
Browse files Browse the repository at this point in the history
…e new operator model

Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Sep 30, 2023
1 parent 172637d commit d19795b
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 237 deletions.
6 changes: 3 additions & 3 deletions go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,12 +528,12 @@ func buildProjection(op *Projection, qb *queryBuilder) error {

// if the projection is on derived table, we use the select we have
// created above and transform it into a derived table
if op.TableID != nil {
if op.DT != nil {
sel := qb.asSelectStatement()
qb.stmt = nil
qb.addTableExpr(op.Alias, op.Alias, TableID(op), &sqlparser.DerivedTable{
qb.addTableExpr(op.DT.Alias, op.DT.Alias, TableID(op), &sqlparser.DerivedTable{
Select: sel,
}, nil, nil)
}, nil, op.DT.Columns)
}

if !isSel {
Expand Down
72 changes: 36 additions & 36 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ type (
ResultColumns int

QP *QueryProjection
// TableID will be non-nil for derived tables
TableID *semantics.TableSet
Alias string

DT *DerivedTable
}
)

Expand Down Expand Up @@ -122,26 +121,33 @@ func (a *Aggregator) addColumnsWithoutPushing(ctx *plancontext.PlanningContext,
}

func (a *Aggregator) isDerived() bool {
return a.TableID != nil
return a.DT != nil
}

func (a *Aggregator) FindCol(ctx *plancontext.PlanningContext, expr sqlparser.Expr, _ bool) (int, error) {
if a.isDerived() {
derivedTBL, err := ctx.SemTable.TableInfoFor(*a.TableID)
if err != nil {
return 0, err
}
expr = semantics.RewriteDerivedTableExpression(expr, derivedTBL)
func (a *Aggregator) FindCol(ctx *plancontext.PlanningContext, in sqlparser.Expr, _ bool) (int, error) {
expr, err := a.DT.RewriteExpression(ctx, in)
if err != nil {
return 0, err
}
if offset, found := canReuseColumn(ctx, a.Columns, expr, extractExpr); found {
return offset, nil
}
return -1, nil
}

func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, groupBy bool, expr *sqlparser.AliasedExpr) (int, error) {
func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, groupBy bool, ae *sqlparser.AliasedExpr) (int, error) {
rewritten, err := a.DT.RewriteExpression(ctx, ae.Expr)
if err != nil {
return 0, err
}

ae = &sqlparser.AliasedExpr{
Expr: rewritten,
As: ae.As,
}

if reuse {
offset, err := a.findColInternal(ctx, expr, groupBy)
offset, err := a.findColInternal(ctx, ae, groupBy)
if err != nil {
return 0, err
}
Expand All @@ -153,7 +159,7 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
// Upon receiving a weight string function from an upstream operator, check for an existing grouping on the argument expression.
// If a grouping is found, continue to push the function down, marking it with 'addToGroupBy' to ensure it's correctly treated as a grouping column.
// This process also sets the weight string column offset, eliminating the need for a later addition in the aggregator operator's planOffset.
if wsExpr, isWS := expr.Expr.(*sqlparser.WeightStringFuncExpr); isWS {
if wsExpr, isWS := rewritten.(*sqlparser.WeightStringFuncExpr); isWS {
idx := slices.IndexFunc(a.Grouping, func(by GroupBy) bool {
return ctx.SemTable.EqualsExprWithDeps(wsExpr.Expr, by.SimplifiedExpr)
})
Expand All @@ -164,39 +170,37 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
}

if !groupBy {
aggr := NewAggr(opcode.AggregateAnyValue, nil, expr, expr.As.String())
aggr := NewAggr(opcode.AggregateAnyValue, nil, ae, ae.As.String())
aggr.ColOffset = len(a.Columns)
a.Aggregations = append(a.Aggregations, aggr)
}

offset := len(a.Columns)
a.Columns = append(a.Columns, expr)
incomingOffset, err := a.Source.AddColumn(ctx, false, groupBy, expr)
a.Columns = append(a.Columns, ae)
incomingOffset, err := a.Source.AddColumn(ctx, false, groupBy, ae)
if err != nil {
return 0, err
}

if offset != incomingOffset {
return 0, errFailedToPlan(expr)
return 0, errFailedToPlan(ae)
}

return offset, nil
}

func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) {
offset, err := a.FindCol(ctx, expr.Expr, false)
func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, ae *sqlparser.AliasedExpr, addToGroupBy bool) (int, error) {
expr := ae.Expr
offset, err := a.FindCol(ctx, expr, false)
if err != nil {
return 0, err
}
if offset >= 0 {
return offset, err
}
if a.isDerived() {
derivedTBL, err := ctx.SemTable.TableInfoFor(*a.TableID)
if err != nil {
return 0, err
}
expr.Expr = semantics.RewriteDerivedTableExpression(expr.Expr, derivedTBL)
expr, err = a.DT.RewriteExpression(ctx, expr)
if err != nil {
return 0, err
}

// Aggregator is little special and cannot work if the input offset are not matched with the aggregation columns.
Expand All @@ -206,10 +210,10 @@ func (a *Aggregator) findColInternal(ctx *plancontext.PlanningContext, expr *sql
return 0, err
}

if offset, found := canReuseColumn(ctx, a.Columns, expr.Expr, extractExpr); found {
if offset, found := canReuseColumn(ctx, a.Columns, expr, extractExpr); found {
return offset, nil
}
colName, isColName := expr.Expr.(*sqlparser.ColName)
colName, isColName := expr.(*sqlparser.ColName)
for i, col := range a.Columns {
if isColName && colName.Name.EqualString(col.As.String()) {
return i, nil
Expand Down Expand Up @@ -252,8 +256,8 @@ func (a *Aggregator) ShortDescription() string {
columns := slice.Map(a.Columns, func(from *sqlparser.AliasedExpr) string {
return sqlparser.String(from)
})
if a.Alias != "" {
columns = append([]string{"derived[" + a.Alias + "]"}, columns...)
if a.DT != nil {
columns = append([]string{a.DT.String()}, columns...)
}

org := ""
Expand Down Expand Up @@ -466,16 +470,12 @@ func (a *Aggregator) SplitAggregatorBelowRoute(input []ops.Operator) *Aggregator
newOp := a.Clone(input).(*Aggregator)
newOp.Pushed = false
newOp.Original = false
newOp.Alias = ""
newOp.TableID = nil
newOp.DT = nil
return newOp
}

func (a *Aggregator) introducesTableID() semantics.TableSet {
if a.TableID == nil {
return semantics.EmptyTableSet()
}
return *a.TableID
return a.DT.introducesTableID()
}

var _ ops.Operator = (*Aggregator)(nil)
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type Horizon struct {
Source ops.Operator

// If this is a derived table, the two following fields will contain the tableID and name of it
TableId *semantics.TableSet
Alias string
TableId *semantics.TableSet
Alias string
ColumnAliases sqlparser.Columns // derived tables can have their column aliases specified outside the subquery

// QP contains the QueryProjection for this op
QP *QueryProjection

Query sqlparser.SelectStatement
ColumnAliases sqlparser.Columns
Query sqlparser.SelectStatement

// Columns needed to feed other plans
Columns []*sqlparser.ColName
Expand Down
25 changes: 17 additions & 8 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ func expandUnionHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, unio

if horizon.TableId != nil {
proj := newAliasedProjection(op)
proj.TableID = horizon.TableId
proj.Alias = horizon.Alias
proj.DT = &DerivedTable{
TableID: *horizon.TableId,
Alias: horizon.Alias,
Columns: horizon.ColumnAliases,
}
op = proj
}

Expand Down Expand Up @@ -135,13 +138,21 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz
return nil, err
}

var dt *DerivedTable
if horizon.TableId != nil {
dt = &DerivedTable{
TableID: *horizon.TableId,
Alias: horizon.Alias,
Columns: horizon.ColumnAliases,
}
}

if !qp.NeedsAggregation() {
projX, err := createProjectionWithoutAggr(ctx, qp, horizon.src())
if err != nil {
return nil, err
}
projX.TableID = horizon.TableId
projX.Alias = horizon.Alias
projX.DT = dt
out = projX

return out, nil
Expand All @@ -158,8 +169,7 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz
QP: qp,
Grouping: qp.GetGrouping(),
Aggregations: aggregations,
TableID: horizon.TableId,
Alias: horizon.Alias,
DT: dt,
}

if complexAggr {
Expand Down Expand Up @@ -204,8 +214,7 @@ outer:

func createProjectionForComplexAggregation(a *Aggregator, qp *QueryProjection) (ops.Operator, error) {
p := newAliasedProjection(a)
p.Alias = a.Alias
p.TableID = a.TableID
p.DT = a.DT
for _, expr := range qp.SelectExprs {
ae, err := expr.GetAliasedExpr()
if err != nil {
Expand Down
Loading

0 comments on commit d19795b

Please sign in to comment.