Skip to content

Commit

Permalink
Merge pull request #9639 from planetscale/aggr-collations
Browse files Browse the repository at this point in the history
Extract collation data to enable distinct aggregation
  • Loading branch information
systay authored Feb 7, 2022
2 parents e970a8d + 7e88855 commit af9ff45
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 37 deletions.
49 changes: 27 additions & 22 deletions go/vt/vtgate/engine/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type OrderedAggregate struct {
// from the result received. If 0, no truncation happens.
TruncateColumnCount int `json:",omitempty"`

// Collations stores the collation ID per column offset.
// It is used for grouping keys and distinct aggregate functions
Collations map[int]collations.ID

// Input is the primitive that will feed into this Primitive.
Input Primitive
}
Expand All @@ -73,10 +77,19 @@ type GroupByParams struct {

// String returns a string. Used for plan descriptions
func (gbp GroupByParams) String() string {
var out string
if gbp.WeightStringCol == -1 || gbp.KeyCol == gbp.WeightStringCol {
return strconv.Itoa(gbp.KeyCol)
out = strconv.Itoa(gbp.KeyCol)
} else {
out = fmt.Sprintf("(%d|%d)", gbp.KeyCol, gbp.WeightStringCol)
}

if gbp.CollationID != collations.Unknown {
collation := collations.Local().LookupByID(gbp.CollationID)
out += " COLLATE " + collation.Name()
}
return fmt.Sprintf("(%d|%d)", gbp.KeyCol, gbp.WeightStringCol)

return out
}

// AggregateParams specify the parameters for each aggregation.
Expand All @@ -86,9 +99,10 @@ type AggregateParams struct {
Col int

// These are used only for distinct opcodes.
KeyCol int
WCol int
WAssigned bool
KeyCol int
WCol int
WAssigned bool
CollationID collations.ID

Alias string `json:",omitempty"`
Expr sqlparser.Expr
Expand All @@ -107,6 +121,10 @@ func (ap *AggregateParams) String() string {
if ap.WAssigned {
keyCol = fmt.Sprintf("%s|%d", keyCol, ap.WCol)
}
if ap.CollationID != collations.Unknown {
collation := collations.Local().LookupByID(ap.CollationID)
keyCol += " COLLATE " + collation.Name()
}
if ap.Alias != "" {
return fmt.Sprintf("%s(%s) AS %s", ap.Opcode.String(), keyCol, ap.Alias)
}
Expand Down Expand Up @@ -186,17 +204,6 @@ func (oa *OrderedAggregate) GetTableName() string {
return oa.Input.GetTableName()
}

// getCollations specifies the collation ID value for columns.
func (oa *OrderedAggregate) getCollations() map[int]collations.ID {
colls := make(map[int]collations.ID)
for _, key := range oa.GroupByKeys {
if key.CollationID != collations.Unknown {
colls[key.KeyCol] = key.CollationID
}
}
return colls
}

// SetTruncateColumnCount sets the truncate column count.
func (oa *OrderedAggregate) SetTruncateColumnCount(count int) {
oa.TruncateColumnCount = count
Expand All @@ -220,7 +227,6 @@ func (oa *OrderedAggregate) execute(vcursor VCursor, bindVars map[string]*queryp
Fields: oa.convertFields(result.Fields),
Rows: make([][]sqltypes.Value, 0, len(result.Rows)),
}
colls := oa.getCollations()
// This code is similar to the one in StreamExecute.
var current []sqltypes.Value
var curDistincts []sqltypes.Value
Expand All @@ -229,13 +235,13 @@ func (oa *OrderedAggregate) execute(vcursor VCursor, bindVars map[string]*queryp
current, curDistincts = oa.convertRow(row)
continue
}
equal, err := oa.keysEqual(current, row, colls)
equal, err := oa.keysEqual(current, row, oa.Collations)
if err != nil {
return nil, err
}

if equal {
current, curDistincts, err = oa.merge(result.Fields, current, row, curDistincts, colls)
current, curDistincts, err = oa.merge(result.Fields, current, row, curDistincts, oa.Collations)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -282,21 +288,20 @@ func (oa *OrderedAggregate) TryStreamExecute(vcursor VCursor, bindVars map[strin
return err
}
}
colls := oa.getCollations()
// This code is similar to the one in Execute.
for _, row := range qr.Rows {
if current == nil {
current, curDistincts = oa.convertRow(row)
continue
}

equal, err := oa.keysEqual(current, row, colls)
equal, err := oa.keysEqual(current, row, oa.Collations)
if err != nil {
return err
}

if equal {
current, curDistincts, err = oa.merge(fields, current, row, curDistincts, colls)
current, curDistincts, err = oa.merge(fields, current, row, curDistincts, oa.Collations)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/engine/ordered_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,6 +1084,7 @@ func TestOrderedAggregateCollate(t *testing.T) {
}},
GroupByKeys: []*GroupByParams{{KeyCol: 0, CollationID: collationID}},
Input: fp,
Collations: map[int]collations.ID{0: collationID},
}

result, err := oa.TryExecute(&noopVCursor{}, nil, false)
Expand Down Expand Up @@ -1125,6 +1126,7 @@ func TestOrderedAggregateCollateAS(t *testing.T) {
Col: 1,
}},
GroupByKeys: []*GroupByParams{{KeyCol: 0, CollationID: collationID}},
Collations: map[int]collations.ID{0: collationID},
Input: fp,
}

Expand Down Expand Up @@ -1169,6 +1171,7 @@ func TestOrderedAggregateCollateKS(t *testing.T) {
Col: 1,
}},
GroupByKeys: []*GroupByParams{{KeyCol: 0, CollationID: collationID}},
Collations: map[int]collations.ID{0: collationID},
Input: fp,
}

Expand Down
35 changes: 24 additions & 11 deletions go/vt/vtgate/planbuilder/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,17 +511,14 @@ func (hp *horizonPlanning) planAggregations(ctx *plancontext.PlanningContext, pl
return nil, err
}

pushExpr, alias, opcode := hp.createPushExprAndAlias(e, handleDistinct, innerAliased, opcode, oa)
pushExpr, param := hp.createPushExprAndAlias(ctx, e, handleDistinct, innerAliased, opcode, oa)
offset, _, err := pushProjection(ctx, pushExpr, plan, true, false, true)
if err != nil {
return nil, err
}
oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, &engine.AggregateParams{
Opcode: opcode,
Col: offset,
Alias: alias,
Expr: fExpr,
})
param.Col = offset
param.Expr = fExpr
oa.eaggr.Aggregates = append(oa.eaggr.Aggregates, param)
}

for _, groupExpr := range hp.qp.GroupByExprs {
Expand Down Expand Up @@ -577,15 +574,16 @@ func (hp *horizonPlanning) planAggregations(ctx *plancontext.PlanningContext, pl
// createPushExprAndAlias creates the expression that should be pushed down to the leaves,
// and changes the opcode so it is a distinct one if needed
func (hp *horizonPlanning) createPushExprAndAlias(
ctx *plancontext.PlanningContext,
expr abstract.SelectExpr,
handleDistinct bool,
innerAliased *sqlparser.AliasedExpr,
opcode engine.AggregateOpcode,
oa *orderedAggregate,
) (*sqlparser.AliasedExpr, string, engine.AggregateOpcode) {
) (*sqlparser.AliasedExpr, *engine.AggregateParams) {
aliasExpr, isAlias := expr.Col.(*sqlparser.AliasedExpr)
if !isAlias {
return nil, "", 0
return nil, nil
}
var alias string
if aliasExpr.As.IsEmpty() {
Expand All @@ -611,7 +609,17 @@ func (hp *horizonPlanning) createPushExprAndAlias(
}
hp.qp.GroupByExprs = append(hp.qp.GroupByExprs, by)
}
return aliasExpr, alias, opcode
collID := collations.Unknown
if innerAliased != nil {
collID = ctx.SemTable.CollationForExpr(innerAliased.Expr)
}

param := &engine.AggregateParams{
Opcode: opcode,
Alias: alias,
CollationID: collID,
}
return aliasExpr, param
}

func hasUniqueVindex(vschema plancontext.VSchema, semTable *semantics.SemTable, groupByExprs []abstract.GroupBy) bool {
Expand Down Expand Up @@ -1156,7 +1164,12 @@ func selectHasUniqueVindex(vschema plancontext.VSchema, semTable *semantics.SemT
// needDistinctHandling returns true if oa needs to handle the distinct clause.
// If true, it will also return the aliased expression that needs to be pushed
// down into the underlying route.
func (hp *horizonPlanning) needDistinctHandling(ctx *plancontext.PlanningContext, funcExpr *sqlparser.FuncExpr, opcode engine.AggregateOpcode, input logicalPlan) (bool, *sqlparser.AliasedExpr, error) {
func (hp *horizonPlanning) needDistinctHandling(
ctx *plancontext.PlanningContext,
funcExpr *sqlparser.FuncExpr,
opcode engine.AggregateOpcode,
input logicalPlan,
) (bool, *sqlparser.AliasedExpr, error) {
if !funcExpr.Distinct {
return false, nil, nil
}
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtgate/planbuilder/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/mysql/collations"

"vitess.io/vitess/go/sqltypes"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -333,6 +335,18 @@ func (oa *orderedAggregate) Wireup(plan logicalPlan, jt *jointab) error {
}

func (oa *orderedAggregate) WireupGen4(semTable *semantics.SemTable) error {
colls := map[int]collations.ID{}
oa.eaggr.Collations = colls
for _, key := range oa.eaggr.Aggregates {
if key.CollationID != collations.Unknown {
colls[key.KeyCol] = key.CollationID
}
}
for _, key := range oa.eaggr.GroupByKeys {
if key.CollationID != collations.Unknown {
colls[key.KeyCol] = key.CollationID
}
}
return oa.input.WireupGen4(semTable)
}

Expand Down
57 changes: 55 additions & 2 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Gen4 plan same as above
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count(0) AS count(*)",
"GroupBy": "(1|4), (2|5), (3|6)",
"GroupBy": "(1|4), (2|5) COLLATE latin1_swedish_ci, (3|6)",
"ResultColumns": 4,
"Inputs": [
{
Expand Down Expand Up @@ -253,7 +253,7 @@ Gen4 plan same as above
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count(0) AS k",
"GroupBy": "(1|4), (2|5), (3|6)",
"GroupBy": "(1|4), (2|5) COLLATE latin1_swedish_ci, (3|6)",
"Inputs": [
{
"OperatorType": "Route",
Expand Down Expand Up @@ -3063,3 +3063,56 @@ Gen4 plan same as above
]
}
}

# distinct on text column with collation
"select col, count(distinct textcol1) from user group by col"
{
"QueryType": "SELECT",
"Original": "select col, count(distinct textcol1) from user group by col",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count_distinct(1) AS count(distinct textcol1)",
"GroupBy": "0",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select col, textcol1, weight_string(textcol1) from `user` where 1 != 1 group by col, textcol1, weight_string(textcol1)",
"OrderBy": "0 ASC, (1|2) ASC",
"Query": "select col, textcol1, weight_string(textcol1) from `user` group by col, textcol1, weight_string(textcol1) order by col asc, textcol1 asc",
"ResultColumns": 2,
"Table": "`user`"
}
]
}
}
{
"QueryType": "SELECT",
"Original": "select col, count(distinct textcol1) from user group by col",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count_distinct(1|2 COLLATE latin1_swedish_ci) AS count(distinct textcol1)",
"GroupBy": "0",
"ResultColumns": 2,
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select col, textcol1, weight_string(textcol1) from `user` where 1 != 1 group by col, textcol1, weight_string(textcol1)",
"OrderBy": "0 ASC, (1|2) ASC COLLATE latin1_swedish_ci",
"Query": "select col, textcol1, weight_string(textcol1) from `user` group by col, textcol1, weight_string(textcol1) order by col asc, textcol1 asc",
"Table": "`user`"
}
]
}
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/memory_sort_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Gen4 error: Expression of SELECT list is not in GROUP BY clause and contains non
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count(1) AS k",
"GroupBy": "(2|3)",
"GroupBy": "(2|3) COLLATE latin1_swedish_ci",
"Inputs": [
{
"OperatorType": "Route",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/testdata/oltp_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Gen4 plan same as above
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"GroupBy": "(0|1)",
"GroupBy": "(0|1) COLLATE latin1_swedish_ci",
"ResultColumns": 1,
"Inputs": [
{
Expand Down

0 comments on commit af9ff45

Please sign in to comment.