diff --git a/dm/syncer/dml_worker.go b/dm/syncer/dml_worker.go index 34e9f190e9f..8ee3beca7db 100644 --- a/dm/syncer/dml_worker.go +++ b/dm/syncer/dml_worker.go @@ -249,7 +249,9 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { time.Sleep(time.Duration(t) * time.Second) }) // use background context to execute sqls as much as possible - ctx, cancel := w.syncCtx.WithTimeout(maxDMLExecutionDuration) + // set timeout to maxDMLConnectionDuration to make sure dmls can be replicated to downstream event if the latency is high + // if users need to quit this asap, we can support pause-task/stop-task --force in the future + ctx, cancel := w.syncCtx.WithTimeout(maxDMLConnectionDuration) defer cancel() affect, err = db.ExecuteSQL(ctx, queries, args...) failpoint.Inject("SafeModeExit", func(val failpoint.Value) { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index e6131b68caa..ab942aeafda 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -86,7 +86,6 @@ var ( maxDDLConnectionTimeout = fmt.Sprintf("%dm", MaxDDLConnectionTimeoutMinute) maxDMLConnectionDuration, _ = time.ParseDuration(maxDMLConnectionTimeout) - maxDMLExecutionDuration = 30 * time.Second defaultMaxPauseOrStopWaitTime = 10 * time.Second