Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move more horizon planning to the operators #13412

Merged
merged 27 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c2171b9
error out if we have aggregate gtid in handleAggr
frouioui Jun 29, 2023
da016f0
remove un-required for loop
frouioui Jun 29, 2023
2139472
add GetSelectExprs to the operator interface and remove horizon/derived
frouioui Jun 29, 2023
6ccbcc9
support distinct aggregations on the new horizon planner
systay Jun 29, 2023
d52f903
stop pushing of aggregation filtering into derived tables
systay Jun 21, 2023
3f84063
wip - enable derived tables in the new horizon planner
frouioui Jun 22, 2023
64aa0fb
Bring back support for union inside derived tables
frouioui Jun 22, 2023
3280d79
fix typo in error
frouioui Jun 22, 2023
c5d7eb8
update test expectations
systay Jun 22, 2023
2e31f83
use semantics.RewriteDerivedTableExpression instead of manual rewriti…
systay Jun 22, 2023
9a11f44
work on making derived tables with aggregation work
systay Jun 27, 2023
7796a6e
refactor code
systay Jun 27, 2023
b134c92
enable horizon planning in more situations
systay Jun 27, 2023
3da1903
update test expectations
systay Jun 29, 2023
c9fcec9
make sure to always use a method to create aggregate params
systay Jun 29, 2023
92d28de
cleanup and refactoring
systay Jun 30, 2023
3cb619e
use weight strings for min/max
systay Jun 30, 2023
77d630a
update list of error code
frouioui Jun 30, 2023
b320671
disallow aggregation on top of aggregation with a clearer error message
systay Jun 30, 2023
f016a89
fail min/max queries on types we cant compare
systay Jun 30, 2023
f6d57f0
test: remove pattern not used
systay Jun 30, 2023
0f9d320
spread table id through derived tables
systay Jul 1, 2023
8aeb0dd
add ordering bottom up so the order can be re-used
systay Jul 1, 2023
14c7bce
unify Derived and Horizon into a single struct
systay Jul 1, 2023
fb61542
refactor: aggregation-pushing
systay Jul 5, 2023
15453dc
add support handling sum(distinct x) and count(distinct x) on top of …
systay Jul 5, 2023
a1901f0
Merge remote-tracking branch 'upstream/main' into continue-horizon-pl…
systay Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/vt/vterrors/code.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ var (
VT09012 = errorWithoutState("VT09012", vtrpcpb.Code_FAILED_PRECONDITION, "%s statement with %s tablet not allowed", "This type of statement is not allowed on the given tablet.")
VT09013 = errorWithoutState("VT09013", vtrpcpb.Code_FAILED_PRECONDITION, "semi-sync plugins are not loaded", "Durability policy wants Vitess to use semi-sync, but the MySQL instances don't have the semi-sync plugin loaded.")
VT09014 = errorWithoutState("VT09014", vtrpcpb.Code_FAILED_PRECONDITION, "vindex cannot be modified", "The vindex cannot be used as table in DML statement")
VT09015 = errorWithoutState("VT09015", vtrpcpb.Code_FAILED_PRECONDITION, "schema tracking required", "This query cannot be planned without more information on the SQL schema. Please turn on schema tracking or add authoritative columns information to your VSchema.")
systay marked this conversation as resolved.
Show resolved Hide resolved

VT10001 = errorWithoutState("VT10001", vtrpcpb.Code_ABORTED, "foreign key constraints are not allowed", "Foreign key constraints are not allowed, see https://vitess.io/blog/2021-06-15-online-ddl-why-no-fk/.")

Expand Down Expand Up @@ -136,6 +137,7 @@ var (
VT09012,
VT09013,
VT09014,
VT09015,
VT10001,
VT12001,
VT13001,
Expand Down
45 changes: 28 additions & 17 deletions go/vt/vtgate/engine/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/vt/vterrors"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/mysql/collations"
Expand All @@ -41,7 +43,6 @@ type AggregateParams struct {
// These are used only for distinct opcodes.
KeyCol int
WCol int
WAssigned bool
CollationID collations.ID

Alias string `json:",omitempty"`
Expand All @@ -53,22 +54,26 @@ type AggregateParams struct {
OrigOpcode AggregateOpcode
}

func (ap *AggregateParams) isDistinct() bool {
return ap.Opcode == AggregateCountDistinct || ap.Opcode == AggregateSumDistinct
func NewAggregateParam(opcode AggregateOpcode, col int, alias string) *AggregateParams {
out := &AggregateParams{
Opcode: opcode,
Col: col,
Alias: alias,
WCol: -1,
}
if opcode.NeedsComparableValues() {
out.KeyCol = col
}
return out
}

func (ap *AggregateParams) preProcess() bool {
switch ap.Opcode {
case AggregateCountDistinct, AggregateSumDistinct, AggregateGtid, AggregateCount, AggregateGroupConcat:
return true
default:
return false
}
func (ap *AggregateParams) WAssigned() bool {
return ap.WCol >= 0
}

func (ap *AggregateParams) String() string {
keyCol := strconv.Itoa(ap.Col)
if ap.WAssigned {
if ap.WAssigned() {
keyCol = fmt.Sprintf("%s|%d", keyCol, ap.WCol)
}
if ap.CollationID != collations.Unknown {
Expand Down Expand Up @@ -161,7 +166,7 @@ func merge(
) ([]sqltypes.Value, []sqltypes.Value, error) {
result := sqltypes.CopyRow(row1)
for index, aggr := range aggregates {
if aggr.isDistinct() {
if aggr.Opcode.IsDistinct() {
if row2[aggr.KeyCol].IsNull() {
continue
}
Expand Down Expand Up @@ -194,8 +199,14 @@ func merge(
}
result[aggr.Col], err = evalengine.NullSafeAdd(value, v2, fields[aggr.Col].Type)
case AggregateMin:
if aggr.WAssigned() && !row2[aggr.Col].IsComparable() {
return minMaxWeightStringError()
}
result[aggr.Col], err = evalengine.Min(row1[aggr.Col], row2[aggr.Col], aggr.CollationID)
case AggregateMax:
if aggr.WAssigned() && !row2[aggr.Col].IsComparable() {
return minMaxWeightStringError()
}
result[aggr.Col], err = evalengine.Max(row1[aggr.Col], row2[aggr.Col], aggr.CollationID)
case AggregateCountDistinct:
result[aggr.Col], err = evalengine.NullSafeAdd(row1[aggr.Col], countOne, fields[aggr.Col].Type)
Expand Down Expand Up @@ -241,6 +252,10 @@ func merge(
return result, curDistincts, nil
}

func minMaxWeightStringError() ([]sqltypes.Value, []sqltypes.Value, error) {
return nil, nil, vterrors.VT12001("min/max on types that are not comparable is not supported")
}

func convertFinal(current []sqltypes.Value, aggregates []*AggregateParams) ([]sqltypes.Value, error) {
result := sqltypes.CopyRow(current)
for _, aggr := range aggregates {
Expand Down Expand Up @@ -270,17 +285,13 @@ func convertFields(fields []*querypb.Field, aggrs []*AggregateParams) []*querypb
if aggr.Alias != "" {
fields[aggr.Col].Name = aggr.Alias
}
if aggr.isDistinct() {
// TODO: this should move to plan time
aggr.KeyCol = aggr.Col
}
Comment on lines -273 to -276
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as discussed, changing this on plan time is a problem. we would need to have local KeyCol index and passed in while execution.

}
return fields
}

func findComparableCurrentDistinct(row []sqltypes.Value, aggr *AggregateParams) sqltypes.Value {
curDistinct := row[aggr.KeyCol]
if aggr.WAssigned && !curDistinct.IsComparable() {
if aggr.WAssigned() && !curDistinct.IsComparable() {
aggr.KeyCol = aggr.WCol
curDistinct = row[aggr.KeyCol]
}
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/engine/opcode/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,21 @@ func (code AggregateOpcode) Type(typ *querypb.Type) (querypb.Type, bool) {
panic(code.String()) // we have a unit test checking we never reach here
}
}

func (code AggregateOpcode) NeedsComparableValues() bool {
switch code {
case AggregateCountDistinct, AggregateSumDistinct, AggregateMin, AggregateMax:
return true
default:
return false
}
}

func (code AggregateOpcode) IsDistinct() bool {
switch code {
case AggregateCountDistinct, AggregateSumDistinct:
return true
default:
return false
}
}
Loading