diff --git a/dm/worker/server.go b/dm/worker/server.go index 8c7a752173d..3c79237f31d 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -72,10 +72,14 @@ type Server struct { wg sync.WaitGroup kaWg sync.WaitGroup httpWg sync.WaitGroup + runWg sync.WaitGroup ctx context.Context cancel context.CancelFunc + runCtx context.Context + runCancel context.CancelFunc + kaCtx context.Context kaCancel context.CancelFunc @@ -107,6 +111,8 @@ func (s *Server) Start() error { var m cmux.CMux + s.runCtx, s.runCancel = context.WithCancel(s.ctx) + // protect member from data race. some functions below like GetRelayConfig, // GetSourceBoundConfig has a built-in timeout so it will not be stuck for a // long time. @@ -143,10 +149,10 @@ func (s *Server) Start() error { s.setWorker(nil, true) - s.wg.Add(1) + s.runWg.Add(1) go func() { - s.runBackgroundJob(s.ctx) - s.wg.Done() + s.runBackgroundJob(s.runCtx) + s.runWg.Done() }() s.startKeepAlive() @@ -162,13 +168,13 @@ func (s *Server) Start() error { } } - s.wg.Add(1) + s.runWg.Add(1) go func(ctx context.Context) { - defer s.wg.Done() + defer s.runWg.Done() // TODO: handle fatal error from observeRelayConfig //nolint:errcheck s.observeRelayConfig(ctx, revRelay) - }(s.ctx) + }(s.runCtx) bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name) if err != nil { @@ -182,9 +188,9 @@ func (s *Server) Start() error { log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String())) } - s.wg.Add(1) + s.runWg.Add(1) go func(ctx context.Context) { - defer s.wg.Done() + defer s.runWg.Done() for { err1 := s.observeSourceBound(ctx, revBound) if err1 == nil { @@ -192,7 +198,7 @@ func (s *Server) Start() error { } s.restartKeepAlive() } - }(s.ctx) + }(s.runCtx) // create a cmux m = cmux.New(s.rootLis) @@ -467,8 +473,8 @@ func (s *Server) doClose() { return } // stop server in advance, stop receiving source bound and relay bound - s.cancel() - s.wg.Wait() + s.runCancel() + s.runWg.Wait() // stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker) if w := s.getSourceWorker(true); w != nil { @@ -494,6 +500,9 @@ func (s *Server) Close() { s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing s.stopKeepAlive() + s.cancel() + s.wg.Wait() + if s.etcdClient != nil { s.etcdClient.Close() }