Skip to content

Commit

Permalink
planner: refactor some code of Plan Cache (#57234)
Browse files Browse the repository at this point in the history
ref #54057
  • Loading branch information
qw4990 authored Nov 8, 2024
1 parent cec48bb commit 1770006
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 503 deletions.
1 change: 1 addition & 0 deletions pkg/planner/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"plan_cache_utils.go",
"plan_cacheable_checker.go",
"plan_clone_generated.go",
"plan_clone_utils.go",
"plan_cost_detail.go",
"plan_cost_ver1.go",
"plan_cost_ver2.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ type InsertGeneratedColumns struct {

func (i InsertGeneratedColumns) cloneForPlanCache() InsertGeneratedColumns {
return InsertGeneratedColumns{
Exprs: cloneExpressionsForPlanCache(i.Exprs),
Exprs: cloneExpressionsForPlanCache(i.Exprs, nil),
OnDuplicates: util.CloneAssignments(i.OnDuplicates),
}
}
Expand Down
1 change: 0 additions & 1 deletion pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2680,7 +2680,6 @@ func convertToPointGet(ds *logicalop.DataSource, prop *property.PhysicalProperty

accessCnt := math.Min(candidate.path.CountAfterAccess, float64(1))
pointGetPlan := PointGetPlan{
ctx: ds.SCtx(),
AccessConditions: candidate.path.AccessConds,
schema: ds.Schema().Clone(),
dbName: ds.DBName.L,
Expand Down
140 changes: 1 addition & 139 deletions pkg/planner/core/generator/plan_cache/plan_clone_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func genPlanCloneForPlanCache(x any) ([]byte, error) {
case "[]expression.Expression", "[]*expression.Column",
"[]*expression.Constant", "[]*expression.ScalarFunction":
structureName := strings.Split(f.Type.String(), ".")[1] + "s"
c.write("cloned.%v = clone%vForPlanCache(op.%v)", f.Name, structureName, f.Name)
c.write("cloned.%v = clone%vForPlanCache(op.%v, nil)", f.Name, structureName, f.Name)
case "[][]*expression.Constant", "[][]expression.Expression":
structureName := strings.Split(f.Type.String(), ".")[1]
c.write("cloned.%v = clone%v2DForPlanCache(op.%v)", f.Name, structureName, f.Name)
Expand Down Expand Up @@ -239,144 +239,6 @@ import (
"github.com/pingcap/tidb/pkg/planner/core/base"
"github.com/pingcap/tidb/pkg/planner/util"
)
func clonePhysicalPlansForPlanCache(newCtx base.PlanContext, plans []base.PhysicalPlan) ([]base.PhysicalPlan, bool) {
clonedPlans := make([]base.PhysicalPlan, len(plans))
for i, plan := range plans {
cloned, ok := plan.CloneForPlanCache(newCtx)
if !ok {
return nil, false
}
clonedPlans[i] = cloned.(base.PhysicalPlan)
}
return clonedPlans, true
}
func cloneExpressionsForPlanCache(exprs []expression.Expression) []expression.Expression {
if exprs == nil {
return nil
}
allSafe := true
for _, e := range exprs {
if !e.SafeToShareAcrossSession() {
allSafe = false
break
}
}
if allSafe {
return exprs
}
cloned := make([]expression.Expression, 0, len(exprs))
for _, e := range exprs {
if e.SafeToShareAcrossSession() {
cloned = append(cloned, e)
} else {
cloned = append(cloned, e.Clone())
}
}
return cloned
}
func cloneExpression2DForPlanCache(exprs [][]expression.Expression) [][]expression.Expression {
if exprs == nil {
return nil
}
cloned := make([][]expression.Expression, 0, len(exprs))
for _, e := range exprs {
cloned = append(cloned, cloneExpressionsForPlanCache(e))
}
return cloned
}
func cloneScalarFunctionsForPlanCache(scalarFuncs []*expression.ScalarFunction) []*expression.ScalarFunction {
if scalarFuncs == nil {
return nil
}
allSafe := true
for _, f := range scalarFuncs {
if !f.SafeToShareAcrossSession() {
allSafe = false
break
}
}
if allSafe {
return scalarFuncs
}
cloned := make([]*expression.ScalarFunction, 0, len(scalarFuncs))
for _, f := range scalarFuncs {
if f.SafeToShareAcrossSession() {
cloned = append(cloned, f)
} else {
cloned = append(cloned, f.Clone().(*expression.ScalarFunction))
}
}
return cloned
}
func cloneColumnsForPlanCache(cols []*expression.Column) []*expression.Column {
if cols == nil {
return nil
}
allSafe := true
for _, c := range cols {
if !c.SafeToShareAcrossSession() {
allSafe = false
break
}
}
if allSafe {
return cols
}
cloned := make([]*expression.Column, 0, len(cols))
for _, c := range cols {
if c == nil {
cloned = append(cloned, nil)
continue
}
if c.SafeToShareAcrossSession() {
cloned = append(cloned, c)
} else {
cloned = append(cloned, c.Clone().(*expression.Column))
}
}
return cloned
}
func cloneConstantsForPlanCache(constants []*expression.Constant) []*expression.Constant {
if constants == nil {
return nil
}
allSafe := true
for _, c := range constants {
if !c.SafeToShareAcrossSession() {
allSafe = false
break
}
}
if allSafe {
return constants
}
cloned := make([]*expression.Constant, 0, len(constants))
for _, c := range constants {
if c.SafeToShareAcrossSession() {
cloned = append(cloned, c)
} else {
cloned = append(cloned, c.Clone().(*expression.Constant))
}
}
return cloned
}
func cloneConstant2DForPlanCache(constants [][]*expression.Constant) [][]*expression.Constant {
if constants == nil {
return nil
}
cloned := make([][]*expression.Constant, 0, len(constants))
for _, c := range constants {
cloned = append(cloned, cloneConstantsForPlanCache(c))
}
return cloned
}
`

func main() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/index_join_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,8 +655,8 @@ func (cwc *ColWithCmpFuncManager) cloneForPlanCache() *ColWithCmpFuncManager {
cloned.colLength = cwc.colLength
cloned.OpType = make([]string, len(cwc.OpType))
copy(cloned.OpType, cwc.OpType)
cloned.opArg = cloneExpressionsForPlanCache(cwc.opArg)
cloned.TmpConstant = cloneConstantsForPlanCache(cwc.TmpConstant)
cloned.opArg = cloneExpressionsForPlanCache(cwc.opArg, nil)
cloned.TmpConstant = cloneConstantsForPlanCache(cwc.TmpConstant, nil)
cloned.affectedColSchema = cwc.affectedColSchema
cloned.compareFuncs = cwc.compareFuncs
return cloned
Expand Down
22 changes: 11 additions & 11 deletions pkg/planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,9 @@ func (pi *PhysPlanPartInfo) cloneForPlanCache() *PhysPlanPartInfo {
return nil
}
cloned := new(PhysPlanPartInfo)
cloned.PruningConds = cloneExpressionsForPlanCache(pi.PruningConds)
cloned.PruningConds = cloneExpressionsForPlanCache(pi.PruningConds, nil)
cloned.PartitionNames = pi.PartitionNames
cloned.Columns = cloneColumnsForPlanCache(pi.Columns)
cloned.Columns = cloneColumnsForPlanCache(pi.Columns, nil)
cloned.ColumnNames = pi.ColumnNames
return cloned
}
Expand Down Expand Up @@ -1353,21 +1353,21 @@ func (p *basePhysicalJoin) cloneForPlanCacheWithSelf(newCtx base.PlanContext, ne
}
cloned.physicalSchemaProducer = *base
cloned.JoinType = p.JoinType
cloned.LeftConditions = cloneExpressionsForPlanCache(p.LeftConditions)
cloned.RightConditions = cloneExpressionsForPlanCache(p.RightConditions)
cloned.OtherConditions = cloneExpressionsForPlanCache(p.OtherConditions)
cloned.LeftConditions = cloneExpressionsForPlanCache(p.LeftConditions, nil)
cloned.RightConditions = cloneExpressionsForPlanCache(p.RightConditions, nil)
cloned.OtherConditions = cloneExpressionsForPlanCache(p.OtherConditions, nil)
cloned.InnerChildIdx = p.InnerChildIdx
cloned.OuterJoinKeys = cloneColumnsForPlanCache(p.OuterJoinKeys)
cloned.InnerJoinKeys = cloneColumnsForPlanCache(p.InnerJoinKeys)
cloned.LeftJoinKeys = cloneColumnsForPlanCache(p.LeftJoinKeys)
cloned.RightJoinKeys = cloneColumnsForPlanCache(p.RightJoinKeys)
cloned.OuterJoinKeys = cloneColumnsForPlanCache(p.OuterJoinKeys, nil)
cloned.InnerJoinKeys = cloneColumnsForPlanCache(p.InnerJoinKeys, nil)
cloned.LeftJoinKeys = cloneColumnsForPlanCache(p.LeftJoinKeys, nil)
cloned.RightJoinKeys = cloneColumnsForPlanCache(p.RightJoinKeys, nil)
cloned.IsNullEQ = make([]bool, len(p.IsNullEQ))
copy(cloned.IsNullEQ, p.IsNullEQ)
for _, d := range p.DefaultValues {
cloned.DefaultValues = append(cloned.DefaultValues, *d.Clone())
}
cloned.LeftNAJoinKeys = cloneColumnsForPlanCache(p.LeftNAJoinKeys)
cloned.RightNAJoinKeys = cloneColumnsForPlanCache(p.RightNAJoinKeys)
cloned.LeftNAJoinKeys = cloneColumnsForPlanCache(p.LeftNAJoinKeys, nil)
cloned.RightNAJoinKeys = cloneColumnsForPlanCache(p.RightNAJoinKeys, nil)
return cloned, true
}

Expand Down
43 changes: 27 additions & 16 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/hint"
"github.com/pingcap/tidb/pkg/util/intest"
)

Expand Down Expand Up @@ -223,11 +224,11 @@ func GetPlanFromPlanCache(ctx context.Context, sctx sessionctx.Context,

paramTypes := parseParamTypes(sctx, params)
if stmtCtx.UseCache() {
cachedVal, hit := lookupPlanCache(ctx, sctx, cacheKey, paramTypes)
plan, outputCols, stmtHints, hit := lookupPlanCache(ctx, sctx, cacheKey, paramTypes)
skipPrivCheck := stmt.PointGet.Executor != nil // this case is specially handled
if hit {
if plan, names, ok, err := adjustCachedPlan(ctx, sctx, cachedVal, isNonPrepared, skipPrivCheck, binding, is, stmt); err != nil || ok {
return plan, names, err
if plan, ok, err := adjustCachedPlan(ctx, sctx, plan, stmtHints, isNonPrepared, skipPrivCheck, binding, is, stmt); err != nil || ok {
return plan, outputCols, err
}
}
}
Expand All @@ -243,7 +244,8 @@ func instancePlanCacheEnabled(ctx context.Context) bool {
return enableInstancePlanCache
}

func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey string, paramTypes []*types.FieldType) (cachedVal *PlanCacheValue, hit bool) {
func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey string,
paramTypes []*types.FieldType) (plan base.Plan, outputCols types.NameSlice, stmtHints *hint.StmtHints, hit bool) {
useInstanceCache := instancePlanCacheEnabled(ctx)
defer func(begin time.Time) {
if hit {
Expand All @@ -252,29 +254,38 @@ func lookupPlanCache(ctx context.Context, sctx sessionctx.Context, cacheKey stri
}(time.Now())
if useInstanceCache {
if v, hit := domain.GetDomain(sctx).GetInstancePlanCache().Get(cacheKey, paramTypes); hit {
cachedVal = v.(*PlanCacheValue)
return cachedVal.CloneForInstancePlanCache(ctx, sctx.GetPlanCtx()) // clone the value to solve concurrency problem
pcv := v.(*PlanCacheValue)
clonedPlan, ok := pcv.Plan.CloneForPlanCache(sctx.GetPlanCtx())
if !ok { // clone the value to solve concurrency problem
return nil, nil, nil, false
}
if intest.InTest && ctx.Value(PlanCacheKeyTestClone{}) != nil {
ctx.Value(PlanCacheKeyTestClone{}).(func(plan, cloned base.Plan))(pcv.Plan, clonedPlan)
}
return clonedPlan, pcv.OutputColumns, pcv.stmtHints, true
}
} else {
if v, hit := sctx.GetSessionPlanCache().Get(cacheKey, paramTypes); hit {
return v.(*PlanCacheValue), true
pcv := v.(*PlanCacheValue)
return pcv.Plan, pcv.OutputColumns, pcv.stmtHints, true
}
}
return nil, false
return nil, nil, nil, false
}

func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context, cachedVal *PlanCacheValue, isNonPrepared, skipPrivCheck bool,
bindSQL string, is infoschema.InfoSchema, stmt *PlanCacheStmt) (base.Plan,
[]*types.FieldName, bool, error) {
func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context,
plan base.Plan, stmtHints *hint.StmtHints, isNonPrepared, skipPrivCheck bool,
bindSQL string, is infoschema.InfoSchema, stmt *PlanCacheStmt) (
base.Plan, bool, error) {
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
if !skipPrivCheck { // keep the prior behavior
if err := checkPreparedPriv(ctx, sctx, stmt, is); err != nil {
return nil, nil, false, err
return nil, false, err
}
}
if !RebuildPlan4CachedPlan(cachedVal.Plan) {
return nil, nil, false, nil
if !RebuildPlan4CachedPlan(plan) {
return nil, false, nil
}
sessVars.FoundInPlanCache = true
if len(bindSQL) > 0 { // We're using binding, set this to true.
Expand All @@ -286,8 +297,8 @@ func adjustCachedPlan(ctx context.Context, sctx sessionctx.Context, cachedVal *P
core_metrics.GetPlanCacheHitCounter(isNonPrepared).Inc()
}
stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
stmtCtx.StmtHints = *cachedVal.stmtHints
return cachedVal.Plan, cachedVal.OutputColumns, true, nil
stmtCtx.StmtHints = *stmtHints
return plan, true, nil
}

// generateNewPlan call the optimizer to generate a new plan for current statement
Expand Down
Loading

0 comments on commit 1770006

Please sign in to comment.