Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: init global vars from system tables before start domain #54913

Merged
merged 8 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 49 additions & 32 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,12 @@ type Domain struct {
// TODO: use Run for each process in future pr
wg *util.WaitGroupEnhancedWrapper
statsUpdating atomicutil.Int32
cancelFns struct {
// this is the parent context of DDL, and also used by other loops such as closestReplicaReadCheckLoop.
// there are other top level contexts in the domain, such as the ones used in
// InitDistTaskLoop and loadStatsWorker, domain only stores the cancelFns of them.
// TODO unify top level context.
ctx context.Context
cancelFns struct {
mu sync.Mutex
fns []context.CancelFunc
}
Expand Down Expand Up @@ -1260,12 +1265,19 @@ func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) {
return cli, err
}

// Init initializes a domain.
// Init initializes a domain. after return, session can be used to do DMLs but not
// DDLs which can be used after domain Start.
func (do *Domain) Init(
ddlLease time.Duration,
sysExecutorFactory func(*Domain) (pools.Resource, error),
ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker,
) error {
// TODO there are many place set ddlLease to 0, remove them completely, we want
// UT and even local uni-store to run similar code path as normal.
if ddlLease == 0 {
ddlLease = time.Second
}

do.sysExecutorFactory = sysExecutorFactory
perfschema.Init()
if ebd, ok := do.store.(kv.EtcdBackend); ok {
Expand Down Expand Up @@ -1294,18 +1306,8 @@ func (do *Domain) Init(
}
}

// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
// That's because `domap.Get` requires a lock, but before
// we initialize Domain finish, we can't require that again.
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
sysFac := func() (pools.Resource, error) {
return sysExecutorFactory(do)
}
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)

ctx, cancelFunc := context.WithCancel(context.Background())
do.ctx = ctx
do.cancelFns.mu.Lock()
do.cancelFns.fns = append(do.cancelFns.fns, cancelFunc)
do.cancelFns.mu.Unlock()
Expand Down Expand Up @@ -1371,9 +1373,6 @@ func (do *Domain) Init(
}
do.isLostConnectionToPD.Store(0)
}

do.wg.Add(1)
go do.serverIDKeeper()
} else {
// set serverID for standalone deployment to enable 'KILL'.
atomic.StoreUint64(&do.serverID, serverIDForStandalone)
Expand All @@ -1395,19 +1394,6 @@ func (do *Domain) Init(
return err
}

// step 4: start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err = do.ddl.Start(sysCtxPool)
if err != nil {
return err
}
do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher()

// TODO there are many place set ddlLease to 0, remove them completely, we want
// UT and even local uni-store to run similar code path as normal.
if ddlLease == 0 {
ddlLease = time.Second
}

sub := time.Since(startReloadTime)
// The reload(in step 2) operation takes more than ddlLease and a new reload operation was not performed,
// the next query will respond by ErrInfoSchemaExpired error. So we do a new reload to update schemaValidator.latestSchemaExpire.
Expand All @@ -1418,27 +1404,58 @@ func (do *Domain) Init(
return err
}
}
return nil
}

// Start starts the domain. After start, DDLs can be executed using session, see
// Init also.
func (do *Domain) Start() error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming it to StartBackgroundLoops and adding some comments that some SQL can still be executed before domain started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0006970

i think Start is clear enough

gCfg := config.GetGlobalConfig()
if gCfg.EnableGlobalKill && do.etcdClient != nil {
do.wg.Add(1)
go do.serverIDKeeper()
}

// TODO: Here we create new sessions with sysFac in DDL,
// which will use `do` as Domain instead of call `domap.Get`.
// That's because `domap.Get` requires a lock, but before
// we initialize Domain finish, we can't require that again.
// After we remove the lazy logic of creating Domain, we
// can simplify code here.
sysFac := func() (pools.Resource, error) {
return do.sysExecutorFactory(do)
}
sysCtxPool := pools.NewResourcePool(sysFac, 512, 512, resourceIdleTimeout)

// start the ddl after the domain reload, avoiding some internal sql running before infoSchema construction.
err := do.ddl.Start(sysCtxPool)
if err != nil {
return err
}
do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher()

// Local store needs to get the change information for every DDL state in each session.
do.wg.Run(func() {
do.loadSchemaInLoop(ctx, ddlLease)
do.loadSchemaInLoop(do.ctx, do.ddl.GetLease())
}, "loadSchemaInLoop")
do.wg.Run(do.mdlCheckLoop, "mdlCheckLoop")
do.wg.Run(do.topNSlowQueryLoop, "topNSlowQueryLoop")
do.wg.Run(do.infoSyncerKeeper, "infoSyncerKeeper")
do.wg.Run(do.globalConfigSyncerKeeper, "globalConfigSyncerKeeper")
do.wg.Run(do.runawayStartLoop, "runawayStartLoop")
do.wg.Run(do.requestUnitsWriterLoop, "requestUnitsWriterLoop")
skipRegisterToDashboard := gCfg.SkipRegisterToDashboard
if !skipRegisterToDashboard {
do.wg.Run(do.topologySyncerKeeper, "topologySyncerKeeper")
}
pdCli := do.GetPDClient()
if pdCli != nil {
do.wg.Run(func() {
do.closestReplicaReadCheckLoop(ctx, pdCli)
do.closestReplicaReadCheckLoop(do.ctx, pdCli)
}, "closestReplicaReadCheckLoop")
}

err = do.initLogBackup(ctx, pdCli)
err = do.initLogBackup(do.ctx, pdCli)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func TestInfo(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL", `return(true)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop", `return(true)`))
require.NoError(t, dom.Init(ddlLease, sysMockFactory, nil))
require.NoError(t, dom.Start())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/NoDDLDispatchLoop"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/MockReplaceDDL"))

Expand Down
2 changes: 1 addition & 1 deletion pkg/expression/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,6 @@ func AssertLocationWithSessionVars(ctxLoc *time.Location, vars *variable.Session
stmtLocStr := vars.StmtCtx.TimeZone().String()
intest.Assert(ctxLocStr == varsLocStr && ctxLocStr == stmtLocStr,
"location mismatch, ctxLoc: %s, varsLoc: %s, stmtLoc: %s",
ctxLoc.String(), ctxLocStr, stmtLocStr,
ctxLocStr, varsLocStr, stmtLocStr,
)
}
Loading