Skip to content

Commit

Permalink
dm-worker: use run ctx instead global ctx to fix double write (#7661)
Browse files Browse the repository at this point in the history
close #7658
  • Loading branch information
GMHDBJD authored Nov 24, 2022
1 parent 7d9a0cb commit 30225db
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ type Server struct {
wg sync.WaitGroup
kaWg sync.WaitGroup
httpWg sync.WaitGroup
runWg sync.WaitGroup

ctx context.Context
cancel context.CancelFunc

runCtx context.Context
runCancel context.CancelFunc

kaCtx context.Context
kaCancel context.CancelFunc

Expand Down Expand Up @@ -107,6 +111,8 @@ func (s *Server) Start() error {

var m cmux.CMux

s.runCtx, s.runCancel = context.WithCancel(s.ctx)

// protect member from data race. some functions below like GetRelayConfig,
// GetSourceBoundConfig has a built-in timeout so it will not be stuck for a
// long time.
Expand Down Expand Up @@ -143,10 +149,10 @@ func (s *Server) Start() error {

s.setWorker(nil, true)

s.wg.Add(1)
s.runWg.Add(1)
go func() {
s.runBackgroundJob(s.ctx)
s.wg.Done()
s.runBackgroundJob(s.runCtx)
s.runWg.Done()
}()

s.startKeepAlive()
Expand All @@ -162,13 +168,13 @@ func (s *Server) Start() error {
}
}

s.wg.Add(1)
s.runWg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
defer s.runWg.Done()
// TODO: handle fatal error from observeRelayConfig
//nolint:errcheck
s.observeRelayConfig(ctx, revRelay)
}(s.ctx)
}(s.runCtx)

bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
Expand All @@ -182,17 +188,17 @@ func (s *Server) Start() error {
log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String()))
}

s.wg.Add(1)
s.runWg.Add(1)
go func(ctx context.Context) {
defer s.wg.Done()
defer s.runWg.Done()
for {
err1 := s.observeSourceBound(ctx, revBound)
if err1 == nil {
return
}
s.restartKeepAlive()
}
}(s.ctx)
}(s.runCtx)

// create a cmux
m = cmux.New(s.rootLis)
Expand Down Expand Up @@ -467,8 +473,8 @@ func (s *Server) doClose() {
return
}
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()
s.runCancel()
s.runWg.Wait()

// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(true); w != nil {
Expand All @@ -494,6 +500,9 @@ func (s *Server) Close() {
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
s.stopKeepAlive()

s.cancel()
s.wg.Wait()

if s.etcdClient != nil {
s.etcdClient.Close()
}
Expand Down

0 comments on commit 30225db

Please sign in to comment.