diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index ace5b91458746..e74fbdb333d54 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -655,46 +655,65 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t if err != nil { return errors.Annotate(err, "enable pessimistic transaction failed") } - return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error { - rows, err := tx.QueryContext( - ctx, - fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName), - ) - if err != nil { - return errors.Annotate(err, "fetch task metas failed") - } - defer rows.Close() - var tasks []taskMeta - for rows.Next() { - var task taskMeta - var statusValue string - if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil { + // there are cases where we have to restart the transaction like error code 1205. We are adding a retry with backoff + backOffTime := time.Second + retryCnt := 5 + for i := 0; i < retryCnt; i++ { + err = exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error { + rows, err := tx.QueryContext( + ctx, + fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName), + ) + if err != nil { + return errors.Annotate(err, "fetch task metas failed") + } + defer rows.Close() + + var tasks []taskMeta + for rows.Next() { + var task taskMeta + var statusValue string + if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil { + return errors.Trace(err) + } + status, err := parseTaskMetaStatus(statusValue) + if err != nil { + return err + } + task.status = status + tasks = append(tasks, task) + } + if err = rows.Err(); err != nil { return errors.Trace(err) } - status, err := parseTaskMetaStatus(statusValue) + newTasks, err := action(tasks) if err != nil { - return err + return errors.Trace(err) } - task.status = status - tasks = append(tasks, task) - } - if err = rows.Err(); err != nil { - return errors.Trace(err) + for _, task := range newTasks { + // nolint:gosec + query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName) + if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil { + return errors.Trace(err) + } + } + return nil + }) + if err == nil { + break } - newTasks, err := action(tasks) - if err != nil { - return errors.Trace(err) + backOffTime *= 2 + if backOffTime > maxBackoffTime { + backOffTime = maxBackoffTime } - for _, task := range newTasks { - // nolint:gosec - query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName) - if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil { - return errors.Trace(err) - } + select { + case <-time.After(backOffTime): + case <-ctx.Done(): + return errors.Trace(ctx.Err()) } - return nil - }) + } + return err } func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {