Skip to content

Commit

Permalink
ddl: record get owner TS and compare it before runReorgJob quit (#55049
Browse files Browse the repository at this point in the history
…) (#55641)

close #54897
  • Loading branch information
ti-chi-bot authored Aug 29, 2024
1 parent b53c06e commit ebf59d6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
16 changes: 15 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ type ddlCtx struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
reorgCtxMap map[int64]*reorgCtx
beOwnerTS int64
}

jobCtx struct {
Expand Down Expand Up @@ -488,6 +489,18 @@ func (dc *ddlCtx) jobContext(job *model.Job) *JobContext {
return NewJobContext()
}

func (dc *ddlCtx) getOwnerTS() int64 {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
return dc.reorgCtx.beOwnerTS
}

func (dc *ddlCtx) setOwnerTS(ts int64) {
dc.reorgCtx.Lock()
dc.reorgCtx.beOwnerTS = ts
dc.reorgCtx.Unlock()
}

func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
Expand All @@ -496,7 +509,7 @@ func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx {

func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx {
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
rc.doneCh = make(chan reorgFnResult, 1)
// initial reorgCtx
rc.setRowCount(r.Job.GetRowCount())
rc.setNextKey(r.StartKey)
Expand Down Expand Up @@ -727,6 +740,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
if err != nil {
logutil.BgLogger().Error("error when getting the ddl history count", zap.Error(err))
}
d.ddlCtx.setOwnerTS(time.Now().Unix())
})

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)
Expand Down
24 changes: 21 additions & 3 deletions ddl/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type reorgCtx struct {
// If the reorganization job is done, we will use this channel to notify outer.
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
doneCh chan error
doneCh chan reorgFnResult
// rowCount is used to simulate a job's row count.
rowCount int64
// notifyCancelReorgJob is used to notify the backfilling goroutine if the DDL job is cancelled.
Expand All @@ -79,6 +79,13 @@ type reorgCtx struct {
}
}

// reorgFnResult records the DDL owner TS before executing reorg function, in order to help
// receiver determine if the result is from reorg function of previous DDL owner in this instance.
type reorgFnResult struct {
ownerTS int64
err error
}

// nullableKey can store <nil> kv.Key.
// Storing a nil object to atomic.Value can lead to panic. This is a workaround.
type nullableKey struct {
Expand Down Expand Up @@ -205,11 +212,13 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo
if job.IsCancelling() {
return dbterror.ErrCancelledDDLJob
}
beOwnerTS := w.ddlCtx.getOwnerTS()
rc = w.newReorgCtx(reorgInfo)
w.wg.Add(1)
go func() {
defer w.wg.Done()
rc.doneCh <- f()
err := f()
rc.doneCh <- reorgFnResult{ownerTS: beOwnerTS, err: err}
}()
}

Expand All @@ -225,7 +234,16 @@ func (w *worker) runReorgJob(rh *reorgHandler, reorgInfo *reorgInfo, tblInfo *mo

// wait reorganization job done or timeout
select {
case err := <-rc.doneCh:
case res := <-rc.doneCh:
err := res.err
curTS := w.ddlCtx.getOwnerTS()
if res.ownerTS != curTS {
d.removeReorgCtx(job)
logutil.BgLogger().Warn("owner ts mismatch, return timeout error and retry",
zap.Int64("prevTS", res.ownerTS),
zap.Int64("curTS", curTS))
return dbterror.ErrWaitReorgTimeout
}
// Since job is cancelled,we don't care about its partial counts.
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job)
Expand Down

0 comments on commit ebf59d6

Please sign in to comment.