Skip to content

Commit

Permalink
ddl: use static contexts in NewReorgCopContext
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Sep 3, 2024
1 parent 0816166 commit b0a35ab
Show file tree
Hide file tree
Showing 8 changed files with 343 additions and 114 deletions.
4 changes: 4 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ go_test(
"//pkg/ddl/session",
"//pkg/ddl/testutil",
"//pkg/ddl/util",
"//pkg/distsql/context",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",
Expand All @@ -290,6 +291,7 @@ go_test(
"//pkg/errno",
"//pkg/executor",
"//pkg/expression",
"//pkg/expression/contextstatic",
"//pkg/infoschema",
"//pkg/keyspace",
"//pkg/kv",
Expand All @@ -304,6 +306,7 @@ go_test(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/parser/types",
"//pkg/privilege",
"//pkg/server",
"//pkg/session",
"//pkg/session/types",
Expand All @@ -329,6 +332,7 @@ go_test(
"//pkg/util/context",
"//pkg/util/dbterror",
"//pkg/util/dbterror/plannererrors",
"//pkg/util/deeptest",
"//pkg/util/domainutil",
"//pkg/util/gcutil",
"//pkg/util/generic",
Expand Down
7 changes: 4 additions & 3 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ type backfillCtx struct {

func newBackfillCtx(id int, rInfo *reorgInfo,
schemaName string, tbl table.Table, jobCtx *ReorgContext, label string, isDistributed bool) (*backfillCtx, error) {
sessCtx, err := newSessCtx(rInfo.jobCtx.store, rInfo.ReorgMeta)
if err != nil {
return nil, err
// TODO: remove newReorgSessCtx
sessCtx := newReorgSessCtx(rInfo.jobCtx.store)
if err := initSessCtx(sessCtx, rInfo.ReorgMeta); err != nil {
return nil, errors.Trace(err)
}

if isDistributed {
Expand Down
47 changes: 33 additions & 14 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/resourcegroup"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/mock"
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
Expand Down Expand Up @@ -125,30 +127,34 @@ func NewReorgCopContext(
allIdxInfo []*model.IndexInfo,
requestSource string,
) (copr.CopContext, error) {
sessCtx, err := newSessCtx(store, reorgMeta)
warnHandler := contextutil.NewStaticWarnHandler(0)
distSQLCtx, err := newReorgDistSQLCtxWithReorgMeta(store.GetClient(), reorgMeta, warnHandler)
if err != nil {
return nil, err
}

exprCtx, err := newReorgExprCtxWithReorgMeta(reorgMeta, warnHandler)
if err != nil {
return nil, err
}

evalCtx := exprCtx.GetEvalCtx()
tc, ec := evalCtx.TypeCtx(), evalCtx.ErrCtx()
pushDownFlags := stmtctx.PushDownFlagsWithTypeFlagsAndErrLevels(tc.Flags(), ec.LevelMap())

return copr.NewCopContext(
sessCtx.GetExprCtx(),
sessCtx.GetDistSQLCtx(),
sessCtx.GetSessionVars().StmtCtx.PushDownFlags(),
exprCtx,
distSQLCtx,
pushDownFlags,
tblInfo,
allIdxInfo,
requestSource,
)
}

func newSessCtx(store kv.Storage, reorgMeta *model.DDLReorgMeta) (sessionctx.Context, error) {
sessCtx := newReorgSessCtx(store)
if err := initSessCtx(sessCtx, reorgMeta); err != nil {
return nil, errors.Trace(err)
}
return sessCtx, nil
}

func newDefaultReorgDistSQLCtx(kvClient kv.Client) *distsqlctx.DistSQLContext {
warnHandler := contextutil.NewStaticWarnHandler(0)
func newDefaultReorgDistSQLCtx(kvClient kv.Client, warnHandler contextutil.WarnAppender) *distsqlctx.DistSQLContext {
intest.AssertNotNil(kvClient)
intest.AssertNotNil(warnHandler)
var sqlKiller sqlkiller.SQLKiller
var execDetails execdetails.SyncExecDetails
return &distsqlctx.DistSQLContext{
Expand All @@ -168,10 +174,23 @@ func newDefaultReorgDistSQLCtx(kvClient kv.Client) *distsqlctx.DistSQLContext {
TiFlashMaxBytesBeforeExternalSort: variable.DefTiFlashMaxBytesBeforeExternalSort,
TiFlashMaxQueryMemoryPerNode: variable.DefTiFlashMemQuotaQueryPerNode,
TiFlashQuerySpillRatio: variable.DefTiFlashQuerySpillRatio,
ResourceGroupName: resourcegroup.DefaultResourceGroupName,
ExecDetails: &execDetails,
}
}

func newReorgDistSQLCtxWithReorgMeta(kvClient kv.Client, reorgMeta *model.DDLReorgMeta, warnHandler contextutil.WarnAppender) (*distsqlctx.DistSQLContext, error) {
loc, err := reorgTimeZoneWithTzLoc(reorgMeta.Location)
if err != nil {
return nil, errors.Trace(err)
}
ctx := newDefaultReorgDistSQLCtx(kvClient, warnHandler)
ctx.Location = loc
ctx.ErrCtx = errctx.NewContextWithLevels(reorgErrLevelsWithSQLMode(reorgMeta.SQLMode), ctx.WarnHandler)
ctx.ResourceGroupName = reorgMeta.ResourceGroupName
return ctx, nil
}

// initSessCtx initializes the session context. Be careful to the timezone.
func initSessCtx(sessCtx sessionctx.Context, reorgMeta *model.DDLReorgMeta) error {
// Correct the initial timezone.
Expand Down
Loading

0 comments on commit b0a35ab

Please sign in to comment.