Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] executor: inline projection for sort #15399

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
byItems = append(byItems, &core.ByItems{Expr: col, Desc: false})
}
sort := &core.PhysicalSort{ByItems: byItems}
sort.SetSchema(src.Schema().Clone())
sort.SetChildren(src)
win.SetChildren(sort)
tail = sort
Expand Down
11 changes: 11 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1473,10 +1473,21 @@ func (b *executorBuilder) buildSort(v *plannercore.PhysicalSort) Executor {
if b.err != nil {
return nil
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema())
var used []int
if childrenUsedSchema != nil {
used = make([]int, 0, len(childrenUsedSchema[0]))
for i, usedSchema := range childrenUsedSchema[0] {
if usedSchema {
used = append(used, i)
}
}
}
sortExec := SortExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
ByItems: v.ByItems,
schema: v.Schema(),
used: used,
}
executorCounterSortExec.Inc()
return &sortExec
Expand Down
12 changes: 12 additions & 0 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ type SortExec struct {
Idx int
fetched bool
schema *expression.Schema
// used show which columns are used by father for child
// NOTE:
// 1. every columns are used if used is nil.
// 2. no columns are used if used is not nil but the size of used is 0.
used []int

keyExprs []expression.Expression
keyTypes []*types.FieldType
Expand Down Expand Up @@ -167,6 +172,13 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error {
func (e *SortExec) generatePartition() {
e.initPointers()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
if e.used != nil {
numChunks := e.rowChunks.NumChunks()
for chkIdx := 0; chkIdx < numChunks; chkIdx++ {
rowChk := e.rowChunks.GetChunk(chkIdx)
rowChk.Prune(e.used)
}
}
e.partitionList = append(e.partitionList, e.rowChunks)
e.partitionRowPtrs = append(e.partitionRowPtrs, e.rowPtrs)
}
Expand Down
1 change: 1 addition & 0 deletions planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (r *ImplSort) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalP
ls.SelectBlockOffset(),
&property.PhysicalProperty{ExpectedCnt: math.MaxFloat64},
)
ps.SetSchema(ls.Schema())
return []memo.Implementation{impl.NewSortImpl(ps)}, nil
}

Expand Down
2 changes: 2 additions & 0 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)

func (ls *LogicalSort) getPhysicalSort(prop *property.PhysicalProperty) *PhysicalSort {
ps := PhysicalSort{ByItems: ls.ByItems}.Init(ls.ctx, ls.stats.ScaleByExpectCnt(prop.ExpectedCnt), ls.blockOffset, &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64})
ps.SetSchema(ls.Schema().Clone())
return ps
}

Expand All @@ -1757,6 +1758,7 @@ func (ls *LogicalSort) getNominalSort(reqProp *property.PhysicalProperty) *Nomin
}
prop.ExpectedCnt = reqProp.ExpectedCnt
ps := NominalSort{OnlyColumn: onlyColumn, ByItems: ls.ByItems}.Init(ls.ctx, ls.blockOffset, prop)
ps.SetSchema(ls.Schema().Clone())
return ps
}

Expand Down
2 changes: 2 additions & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,8 @@ func (b *PlanBuilder) buildSort(ctx context.Context, p LogicalPlan, byItems []*a
exprs = append(exprs, &ByItems{Expr: it, Desc: item.Desc})
}
sort.ByItems = exprs
sort.SetSchema(p.Schema().Clone())
sort.names = p.OutputNames()
sort.SetChildren(p)
return sort, nil
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ type LogicalUnionAll struct {

// LogicalSort stands for the order by plan.
type LogicalSort struct {
baseLogicalPlan
logicalSchemaProducer

ByItems []*ByItems
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,15 @@ type PhysicalStreamAgg struct {

// PhysicalSort is the physical operator of sort, which implements a memory sort.
type PhysicalSort struct {
basePhysicalPlan
physicalSchemaProducer

ByItems []*ByItems
}

// NominalSort asks sort properties for its child. It is a fake operator that will not
// appear in final physical operator tree. It will be eliminated or converted to Projection.
type NominalSort struct {
basePhysicalPlan
physicalSchemaProducer

// These two fields are used to switch ScalarFunctions to Constants. For these
// NominalSorts, we need to converted to Projections check if the ScalarFunctions
Expand Down
1 change: 1 addition & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan,
sort := LogicalSort{
ByItems: []*ByItems{{Expr: orderByCol}},
}.Init(b.ctx, b.getSelectOffset())
sort.SetSchema(np.Schema().Clone())
sort.SetChildren(np)
np = sort
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (p *basePhysicalAgg) ResolveIndices() (err error) {

// ResolveIndices implements Plan interface.
func (p *PhysicalSort) ResolveIndices() (err error) {
err = p.basePhysicalPlan.ResolveIndices()
err = p.physicalSchemaProducer.ResolveIndices()
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (ls *LogicalSort) PruneColumns(parentUsedCols []*expression.Column) error {
parentUsedCols = append(parentUsedCols, cols...)
}
}
ls.inlineProjection(parentUsedCols)
return child.PruneColumns(parentUsedCols)
}

Expand Down
1 change: 1 addition & 0 deletions planner/core/rule_max_min_eliminate.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation) *Logic
// Compose Sort operator.
sort := LogicalSort{}.Init(ctx, agg.blockOffset)
sort.ByItems = append(sort.ByItems, &ByItems{f.Args[0], desc})
sort.SetSchema(child.Schema().Clone())
sort.SetChildren(child)
child = sort
}
Expand Down