Skip to content

Commit

Permalink
jobs: retry failed jobs with exponential-backoff
Browse files Browse the repository at this point in the history
Failed jobs were being retried with a constant interval in the previous
implementation. This commit enables jobs to be retried with exponentially
increasing delays with an upper bound. This change enables to retry the jobs
that are not currently retried when they fail due to transient problems.

Release note: None

Fixes: #44594
  • Loading branch information
Sajjad Rizvi committed Jul 13, 2021
1 parent a75c417 commit 3e0a028
Show file tree
Hide file tree
Showing 19 changed files with 1,350 additions and 54 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,4 @@ trace.datadog.project string CockroachDB the project under which traces will be
trace.debug.enable boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests
trace.lightstep.token string if set, traces go to Lightstep using this token
trace.zipkin.collector string if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.
version version 21.1-116 set the active cluster version in the format '<major>.<minor>'
version version 21.1-118 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://<ui>/debug/requests</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'). Only one tracer can be configured at a time.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-116</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.1-118</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ const (
// SQLStatsTable adds the system tables for storing persisted SQL statistics
// for statements and transactions.
SQLStatsTable
// RetryJobsWithExponentialBackoff retries failed jobs with exponential delays.
RetryJobsWithExponentialBackoff

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -530,6 +532,11 @@ var versionsSingleton = keyedVersions{
Key: SQLStatsTable,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 116},
},
{
Key: RetryJobsWithExponentialBackoff,
Version: roachpb.Version{Major: 21, Minor: 1, Internal: 118},
},

// Step (2): Add new versions here.
}

Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb",
Expand Down
58 changes: 48 additions & 10 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -42,16 +43,16 @@ const (
// NonTerminalStatusTupleString is a sql tuple corresponding to statuses of
// non-terminal jobs.
NonTerminalStatusTupleString = `(` + nonTerminalStatusList + `)`
)

const claimQuery = `
claimQuery = `
UPDATE system.jobs
SET claim_session_id = $1, claim_instance_id = $2
WHERE (claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `)
WHERE ((claim_session_id IS NULL)
AND (status IN ` + claimableStatusTupleString + `))
ORDER BY created DESC
LIMIT $3
RETURNING id;`
)

// claimJobs places a claim with the given SessionID to job rows that are
// available.
Expand All @@ -64,8 +65,7 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
}
numRows, err := r.ex.Exec(
ctx, "claim-jobs", txn, claimQuery,
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop,
)
s.ID().UnsafeBytes(), r.ID(), maxAdoptionsPerLoop)
if err != nil {
return errors.Wrap(err, "could not query jobs table")
}
Expand All @@ -76,14 +76,49 @@ func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
})
}

const (
processQuery = `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)
`

// Select only those jobs that have their next retry time before the current time.
// 2^62 is the max positive number.
// $5 is registry's current timestamp.
// $6 is the starting value in exponential backoff calculation.
// $7 is the max retry delay.
nextRunClause = `
AND
($5 > (COALESCE(last_run, created)
+
least(($6::INTERVAL * ((1 << least(62, COALESCE(num_runs, 0))) - 1)), $7::INTERVAL)))`
)

// getProcessQuery returns the query that selects the jobs that are claimed
// by this node.
func getProcessQuery(
ctx context.Context, s sqlliveness.Session, r *Registry,
) (string, []interface{}) {
// Select the running or reverting jobs that this node has claimed.
query := processQuery
args := []interface{}{StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID()}
// Gating the version that introduced job retries with exponential backoff.
if r.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
// Select only those jobs that can be executed right now.
query += nextRunClause
retryStartVal := retryStartValSetting.Get(&r.settings.SV).Seconds()
retryMax := retryMaxDelaySetting.Get(&r.settings.SV).Seconds()
args = append(args, []interface{}{r.clock.Now().GoTime(), retryStartVal, retryMax}...)
}
return query, args
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
query, args := getProcessQuery(ctx, s, r)
it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, `
SELECT id FROM system.jobs
WHERE (status = $1 OR status = $2) AND (claim_session_id = $3 AND claim_instance_id = $4)`,
StatusRunning, StatusReverting, s.ID().UnsafeBytes(), r.ID(),
sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, query, args...,
)
if err != nil {
return errors.Wrapf(err, "could not query for claimed jobs")
Expand Down Expand Up @@ -219,6 +254,9 @@ FROM system.jobs WHERE id = $1 AND claim_session_id = $2`,
if err := r.stopper.RunAsyncTask(ctx, job.taskName(), func(ctx context.Context) {
// Wait for the job to finish. No need to print the error because if there
// was one it's been set in the job status already.
// TODO(sajjad): To discuss: In some cases, the error may not be set. For
// example, errors in update() called in runJob() will result in
// silently dropping the error.
_ = r.runJob(resumeCtx, resumer, job, status, job.taskName())
}); err != nil {
r.removeAdoptedJob(jobID)
Expand Down
68 changes: 48 additions & 20 deletions pkg/jobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,45 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

const intervalBaseSettingKey = "jobs.registry.interval.base"
const adoptIntervalSettingKey = "jobs.registry.interval.adopt"
const cancelIntervalSettingKey = "jobs.registry.interval.cancel"
const gcIntervalSettingKey = "jobs.registry.interval.gc"
const retentionTimeSettingKey = "jobs.retention_time"
const cancelUpdateLimitKey = "jobs.cancel_update_limit"
const (
intervalBaseSettingKey = "jobs.registry.interval.base"
adoptIntervalSettingKey = "jobs.registry.interval.adopt"
cancelIntervalSettingKey = "jobs.registry.interval.cancel"
gcIntervalSettingKey = "jobs.registry.interval.gc"
retentionTimeSettingKey = "jobs.retention_time"
cancelUpdateLimitKey = "jobs.cancel_update_limit"
retryStartValSettingKey = "jobs.registry.retry.backoff_start"
retryMaxDelaySettingKey = "jobs.registry.retry.backoff_max"
)

var (
// defaultAdoptInterval is the default adopt interval.
defaultAdoptInterval = 30 * time.Second

// defaultAdoptInterval is the default adopt interval.
var defaultAdoptInterval = 30 * time.Second
// defaultCancelInterval is the default cancel interval.
defaultCancelInterval = 10 * time.Second

// defaultCancelInterval is the default cancel interval.
var defaultCancelInterval = 10 * time.Second
// defaultGcInterval is the default GC Interval.
defaultGcInterval = 1 * time.Hour

// defaultGcInterval is the default GC Interval.
var defaultGcInterval = 1 * time.Hour
// defaultIntervalBase is the default interval base.
defaultIntervalBase = 1.0

// defaultIntervalBase is the default interval base.
var defaultIntervalBase = 1.0
// defaultRetentionTime is the default duration for which terminal jobs are
// kept in the records.
defaultRetentionTime = 14 * 24 * time.Hour

// defaultRetentionTime is the default duration for which terminal jobs are
// kept in the records.
var defaultRetentionTime = 14 * 24 * time.Hour
// defaultCancellationsUpdateLimit is the default number of jobs that can be
// updated when canceling jobs concurrently from dead sessions.
defaultCancellationsUpdateLimit int64 = 1000

// defaultCancellationsUpdateLimit is the default number of jobs that can be
// updated when canceling jobs concurrently from dead sessions.
var defaultCancellationsUpdateLimit int64 = 1000
// defaultRetryStartVal is the starting value in the calculation of retry interval
// of a failed job using exponential backoff.
defaultRetryStartVal = 30 * time.Second

// defaultRetryMaxDelay is the maximum backoff delay to retry a failed job.
defaultRetryMaxDelay = 24 * time.Hour
)

var (
intervalBaseSetting = settings.RegisterFloatSetting(
Expand Down Expand Up @@ -93,6 +106,21 @@ var (
defaultCancellationsUpdateLimit,
settings.NonNegativeInt,
)

retryStartValSetting = settings.RegisterDurationSetting(
retryStartValSettingKey,
"the starting value of the exponential-backoff delay "+
" in the next retry of a failed job",
defaultRetryStartVal,
settings.NonNegativeDuration,
)

retryMaxDelaySetting = settings.RegisterDurationSetting(
retryMaxDelaySettingKey,
"the maximum delay to retry a failed job",
defaultRetryMaxDelay,
settings.PositiveDuration,
)
)

// jitter adds a small jitter in the given duration.
Expand Down
12 changes: 11 additions & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"reflect"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -237,7 +238,14 @@ func (j *Job) started(ctx context.Context, txn *kv.Txn) error {
// TODO(spaskob): Remove this status change after we stop supporting
// pending job states.
ju.UpdateStatus(StatusRunning)
md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
if md.Payload.StartedMicros == 0 {
md.Payload.StartedMicros = timeutil.ToUnixMicros(j.registry.clock.Now().GoTime())
}

if j.registry.settings.Version.IsActive(ctx, clusterversion.RetryJobsWithExponentialBackoff) {
ju.UpdateRunStats(md.RunStats.NumRuns+1, j.registry.clock.Now().GoTime())
}

ju.UpdatePayload(md.Payload)
return nil
})
Expand Down Expand Up @@ -514,6 +522,8 @@ func (j *Job) pauseRequested(ctx context.Context, txn *kv.Txn, fn onPauseRequest
})
}

// TODO(sajjad): Update runInfo in reverted

// reverted sets the status of the tracked job to reverted.
func (j *Job) reverted(
ctx context.Context, txn *kv.Txn, err error, fn func(context.Context, *kv.Txn) error,
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/jobs.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ enum Type {

message Job {
int64 id = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "JobID"];
// Keep progress first as it may bre more relevant to see when looking at a
// Keep progress first as it may be more relevant to see when looking at a
// running job.
Progress progress = 2;
Payload payload = 3;
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func MakeRegistry(
}
if knobs != nil {
r.knobs = *knobs
if knobs.TimeSource != nil {
r.clock = knobs.TimeSource
}
}
r.mu.adoptedJobs = make(map[jobspb.JobID]*adoptedJob)
r.metrics.init(histogramWindowInterval)
Expand Down Expand Up @@ -1006,6 +1009,7 @@ func (r *Registry) stepThroughStateMachine(
jobType := payload.Type()
log.Infof(ctx, "%s job %d: stepping through state %s with error: %+v", jobType, job.ID(), status, jobErr)
jm := r.metrics.JobMetrics[jobType]

switch status {
case StatusRunning:
if jobErr != nil {
Expand All @@ -1022,6 +1026,9 @@ func (r *Registry) stepThroughStateMachine(
func() {
jm.CurrentlyRunning.Inc(1)
defer jm.CurrentlyRunning.Dec(1)
if err != nil {
return
}
err = resumer.Resume(resumeCtx, execCtx)
}()
if err == nil {
Expand Down
Loading

0 comments on commit 3e0a028

Please sign in to comment.