Skip to content

Commit

Permalink
add retry if transaction failed while fetching task metas (pingcap#35)
Browse files Browse the repository at this point in the history
Co-authored-by: rishabh_mittal <[email protected]>
  • Loading branch information
Rishabh Mittal and mittalrishabh committed May 6, 2024
1 parent 4ffedcd commit a431738
Showing 1 changed file with 51 additions and 32 deletions.
83 changes: 51 additions & 32 deletions br/pkg/lightning/importer/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,46 +655,65 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t
if err != nil {
return errors.Annotate(err, "enable pessimistic transaction failed")
}
return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail from %s FOR UPDATE", m.tableName),
)
if err != nil {
return errors.Annotate(err, "fetch task metas failed")
}
defer rows.Close()

var tasks []taskMeta
for rows.Next() {
var task taskMeta
var statusValue string
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.tikvSourceBytes, &task.tiflashSourceBytes, &task.tikvAvail, &task.tiflashAvail); err != nil {
// there are cases where we have to restart the transaction like error code 1205. We are adding a retry with backoff
backOffTime := time.Second
retryCnt := 5
for i := 0; i < retryCnt; i++ {
err = exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName),
)
if err != nil {
return errors.Annotate(err, "fetch task metas failed")
}
defer rows.Close()

var tasks []taskMeta
for rows.Next() {
var task taskMeta
var statusValue string
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
if err != nil {
return err
}
task.status = status
tasks = append(tasks, task)
}
if err = rows.Err(); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
newTasks, err := action(tasks)
if err != nil {
return err
return errors.Trace(err)
}
task.status = status
tasks = append(tasks, task)
}
if err = rows.Err(); err != nil {
return errors.Trace(err)
for _, task := range newTasks {
// nolint:gosec
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName)
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil {
return errors.Trace(err)
}
}
return nil
})
if err == nil {
break
}
newTasks, err := action(tasks)
if err != nil {
return errors.Trace(err)
backOffTime *= 2
if backOffTime > maxBackoffTime {
backOffTime = maxBackoffTime
}
for _, task := range newTasks {
// nolint:gosec
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, tikv_source_bytes, tiflash_source_bytes, tikv_avail, tiflash_avail) VALUES(?, ?, ?, ?, ?, ?, ?, ?)", m.tableName)
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.tikvSourceBytes, task.tiflashSourceBytes, task.tikvAvail, task.tiflashAvail); err != nil {
return errors.Trace(err)
}
select {
case <-time.After(backOffTime):
case <-ctx.Done():
return errors.Trace(ctx.Err())
}
return nil
})
}
return err
}

func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
Expand Down

0 comments on commit a431738

Please sign in to comment.