Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add 2pb logic and fix some bugs for partitionTopN #42334

Merged
merged 13 commits into from
Mar 17, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@
" └─Window 1.00 root row_number()->Column#4 over(partition by test.t.b rows between current row and current row)",
" └─Sort 1.00 root test.t.b",
" └─TableReader 1.00 root data:Limit",
" └─Limit 1.00 cop[tikv] offset:0, count:1",
" └─Limit 1.00 cop[tikv] partition by test.t.b, offset:0, count:1",
" └─TableFullScan 1.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
"Res": [
Expand Down Expand Up @@ -403,7 +403,7 @@
" └─Window 3.00 root row_number()->Column#4 over(partition by test.t.b rows between current row and current row)",
" └─Sort 3.00 root test.t.b",
" └─TableReader 3.00 root data:Limit",
" └─Limit 3.00 cop[tikv] offset:0, count:3",
" └─Limit 3.00 cop[tikv] partition by test.t.b, offset:0, count:3",
" └─Selection 3.00 cop[tikv] ge(test.t.a, 2)",
" └─TableFullScan 9.00 cop[tikv] table:t keep order:false, stats:pseudo"
],
Expand Down Expand Up @@ -452,7 +452,7 @@
" └─Window 1.00 root row_number()->Column#4 over(partition by test.td.b rows between current row and current row)",
" └─Sort 1.00 root test.td.b",
" └─TableReader 1.00 root data:Limit",
" └─Limit 1.00 cop[tikv] offset:0, count:1",
" └─Limit 1.00 cop[tikv] partition by test.td.b, offset:0, count:1",
" └─TableFullScan 1.00 cop[tikv] table:td keep order:false, stats:pseudo"
],
"Res": [
Expand Down
28 changes: 16 additions & 12 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,14 @@ func (p *PhysicalSort) ExplainInfo() string {

// ExplainInfo implements Plan interface.
func (p *PhysicalLimit) ExplainInfo() string {
var str strings.Builder
str.WriteString("offset:")
str.WriteString(strconv.FormatUint(p.Offset, 10))
str.WriteString(", count:")
str.WriteString(strconv.FormatUint(p.Count, 10))
return str.String()
buffer := bytes.NewBufferString("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to differentiate Limit with partition by calling it WindowLimit and do not include partition by fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean adding a new PhysicalWindowLimit or just display as WindowLimit in the explain info? Adding a new PhysicalWindowLimit need a lots of changes, and I think we will miss the deadline if we wants to this :(

if len(p.GetPartitionBy()) > 0 {
buffer = explainPartitionBy(buffer, p.GetPartitionBy(), false)
fmt.Fprintf(buffer, ", offset:%v, count:%v", p.Offset, p.Count)
} else {
fmt.Fprintf(buffer, "offset:%v, count:%v", p.Offset, p.Count)
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
Expand Down Expand Up @@ -960,12 +962,14 @@ func (lt *LogicalTopN) ExplainInfo() string {

// ExplainInfo implements Plan interface.
func (p *LogicalLimit) ExplainInfo() string {
var str strings.Builder
str.WriteString("offset:")
str.WriteString(strconv.FormatUint(p.Offset, 10))
str.WriteString(", count:")
str.WriteString(strconv.FormatUint(p.Count, 10))
return str.String()
buffer := bytes.NewBufferString("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. I think it is better to differentiate TopN with partition by calling it WindowTopN and do not include partition by fields.

if len(p.GetPartitionBy()) > 0 {
buffer = explainPartitionBy(buffer, p.GetPartitionBy(), false)
fmt.Fprintf(buffer, ", offset:%v, count:%v", p.Offset, p.Count)
} else {
fmt.Fprintf(buffer, "offset:%v, count:%v", p.Offset, p.Count)
}
return buffer.String()
}

// ExplainInfo implements Plan interface.
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type LogicalJoin struct {
rightPreferJoinType uint

EqualConditions []*expression.ScalarFunction
// NAEQConditions means null aware equal conditions, which is used for null aware semi joins.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is the changes from #41763, I can remove it.

NAEQConditions []*expression.ScalarFunction
LeftConditions expression.CNFExprs
RightConditions expression.CNFExprs
Expand Down
8 changes: 8 additions & 0 deletions planner/core/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti
for _, item := range p.ByItems {
topNExec.OrderBy = append(topNExec.OrderBy, expression.SortByItemToPB(sc, client, item.Expr, item.Desc))
}
for _, item := range p.PartitionBy {
topNExec.PartitionBy = append(topNExec.PartitionBy, expression.SortByItemToPB(sc, client, item.Col.Clone(), item.Desc))
}
executorID := ""
if storeType == kv.TiFlash {
var err error
Expand All @@ -194,10 +197,15 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti

// ToPB implements PhysicalPlan ToPB interface.
func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) {
sc := ctx.GetSessionVars().StmtCtx
client := ctx.GetClient()
limitExec := &tipb.Limit{
Limit: p.Count,
}
executorID := ""
for _, item := range p.PartitionBy {
limitExec.PartitionBy = append(limitExec.PartitionBy, expression.SortByItemToPB(sc, client, item.Col.Clone(), item.Desc))
}
if storeType == kv.TiFlash {
var err error
limitExec.Child, err = p.children[0].ToPB(ctx, storeType)
Expand Down
23 changes: 23 additions & 0 deletions planner/core/resolve_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,29 @@ func (p *PhysicalTopN) ResolveIndices() (err error) {
return err
}
}
for i, item := range p.PartitionBy {
newCol, err := item.Col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
p.PartitionBy[i].Col = newCol.(*expression.Column)
}
return
}

// ResolveIndices implements Plan interface.
func (p *PhysicalLimit) ResolveIndices() (err error) {
err = p.basePhysicalPlan.ResolveIndices()
if err != nil {
return err
}
for i, item := range p.PartitionBy {
newCol, err := item.Col.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
p.PartitionBy[i].Col = newCol.(*expression.Column)
}
return
}

Expand Down
4 changes: 2 additions & 2 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
// Strictly speaking, for the row count of stats, we should multiply newCount with "regionNum",
// but "regionNum" is unknown since the copTask can be a double read, so we ignore it now.
stats := deriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset)
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.ctx, stats, p.blockOffset)
cop = attachPlan2Task(pushedDownLimit, cop).(*copTask)
// Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right.
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
Expand All @@ -851,7 +851,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
for _, partialScan := range cop.idxMergePartPlans {
childProfile := partialScan.statsInfo()
stats := deriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset)
pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.ctx, stats, p.blockOffset)
pushedDownLimit.SetChildren(partialScan)
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
limitChildren = append(limitChildren, pushedDownLimit)
Expand Down