Skip to content

Commit

Permalink
Consider per-worker timeout overrides when rescuing jobs
Browse files Browse the repository at this point in the history
This one came up when I was thinking about the job specific rescue
threshold floated in [1].

I was going to suggest the possible workaround of setting an aggressive
rescue threshold combined with a low job timeout globally, and then
override the timeout on any specific job workers that needed to run
longer than the new low global job timeout. But then I realized this
wouldn't work because the job rescuer doesn't account for job-specific
timeouts -- it just rescues or discards everything it finds beyond the
run's rescue threshold.

Here, add new logic to address that problem. Luckily we were already
pulling worker information to procure what might be a possible custom
retry schedule, so we just have to piggyback onto that to also examine
a possible custom work timeout.

[1] #347
  • Loading branch information
brandur committed May 11, 2024
1 parent 4def8bc commit bec5206
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 54 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- River now considers per-worker timeout overrides when rescuing jobs so that jobs with a long custom timeout won't be rescued prematurely. [PR #350](https://github.com/riverqueue/river/pull/350).

## [0.6.0] - 2024-05-08

### Added
Expand Down
84 changes: 57 additions & 27 deletions internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/maintenance/startstop"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/timeutil"
"github.com/riverqueue/river/internal/util/valutil"
"github.com/riverqueue/river/internal/workunit"
Expand Down Expand Up @@ -164,22 +165,20 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
now := time.Now().UTC()

rescueManyParams := riverdriver.JobRescueManyParams{
ID: make([]int64, len(stuckJobs)),
Error: make([][]byte, len(stuckJobs)),
FinalizedAt: make([]time.Time, len(stuckJobs)),
ScheduledAt: make([]time.Time, len(stuckJobs)),
State: make([]string, len(stuckJobs)),
ID: make([]int64, 0, len(stuckJobs)),
Error: make([][]byte, 0, len(stuckJobs)),
FinalizedAt: make([]time.Time, 0, len(stuckJobs)),
ScheduledAt: make([]time.Time, 0, len(stuckJobs)),
State: make([]string, 0, len(stuckJobs)),
}

for i, job := range stuckJobs {
rescueManyParams.ID[i] = job.ID

for _, job := range stuckJobs {
var metadata metadataWithCancelAttemptedAt
if err := json.Unmarshal(job.Metadata, &metadata); err != nil {
return nil, fmt.Errorf("error unmarshaling job metadata: %w", err)
}

rescueManyParams.Error[i], err = json.Marshal(rivertype.AttemptError{
errorData, err := json.Marshal(rivertype.AttemptError{
At: now,
Attempt: max(job.Attempt, 0),
Error: "Stuck job rescued by Rescuer",
Expand All @@ -189,29 +188,41 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
return nil, fmt.Errorf("error marshaling error JSON: %w", err)
}

addRescueParam := func(state rivertype.JobState, finalizedAt *time.Time, scheduledAt time.Time) {
rescueManyParams.ID = append(rescueManyParams.ID, job.ID)
rescueManyParams.Error = append(rescueManyParams.Error, errorData)
rescueManyParams.FinalizedAt = append(rescueManyParams.FinalizedAt, ptrutil.ValOrDefault(finalizedAt, time.Time{}))
rescueManyParams.ScheduledAt = append(rescueManyParams.ScheduledAt, scheduledAt)
rescueManyParams.State = append(rescueManyParams.State, string(state))
}

if !metadata.CancelAttemptedAt.IsZero() {
res.NumJobsCancelled++
rescueManyParams.FinalizedAt[i] = now
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
rescueManyParams.State[i] = string(rivertype.JobStateCancelled)
addRescueParam(rivertype.JobStateCancelled, &now, job.ScheduledAt) // reused previous scheduled value
continue
}
shouldRetry, retryAt := s.makeRetryDecision(ctx, job)
if shouldRetry {
res.NumJobsRetried++
rescueManyParams.ScheduledAt[i] = retryAt
rescueManyParams.State[i] = string(rivertype.JobStateRetryable)
} else {

retryDecision, retryAt := s.makeRetryDecision(ctx, job, now)

switch retryDecision {
case jobRetryDecisionDiscard:
res.NumJobsDiscarded++
rescueManyParams.FinalizedAt[i] = now
rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value
rescueManyParams.State[i] = string(rivertype.JobStateDiscarded)
addRescueParam(rivertype.JobStateDiscarded, &now, job.ScheduledAt) // reused previous scheduled value

case jobRetryDecisionIgnore:
// job not timed out yet due to kind-specific timeout value; ignore

case jobRetryDecisionRetry:
res.NumJobsRetried++
addRescueParam(rivertype.JobStateRetryable, nil, retryAt)
}
}

_, err = s.exec.JobRescueMany(ctx, &rescueManyParams)
if err != nil {
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
if len(rescueManyParams.ID) > 0 {
_, err = s.exec.JobRescueMany(ctx, &rescueManyParams)
if err != nil {
return nil, fmt.Errorf("error rescuing stuck jobs: %w", err)
}
}

s.TestSignals.UpdatedBatch.Signal(struct{}{})
Expand Down Expand Up @@ -245,14 +256,24 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err
})
}

// jobRetryDecision is a signal from makeRetryDecision as to what to do with a
// particular job that appears to be eligible for rescue.
type jobRetryDecision int

const (
jobRetryDecisionDiscard jobRetryDecision = iota // discard the job
jobRetryDecisionIgnore // don't retry or discard the job
jobRetryDecisionRetry // retry the job
)

// makeRetryDecision decides whether or not a rescued job should be retried, and if so,
// when.
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow) (bool, time.Time) {
func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) {
workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind)
if workUnitFactory == nil {
s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding",
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
return false, time.Time{}
return jobRetryDecisionDiscard, time.Time{}
}

workUnit := workUnitFactory.MakeUnit(job)
Expand All @@ -261,9 +282,18 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo
slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID))
}

if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() {
return jobRetryDecisionIgnore, time.Time{}
}

nextRetry := workUnit.NextRetry()
if nextRetry.IsZero() {
nextRetry = s.Config.ClientRetryPolicy.NextRetry(job)
}
return job.Attempt < max(job.MaxAttempts, 0), nextRetry

if job.Attempt < max(job.MaxAttempts, 0) {
return jobRetryDecisionRetry, nextRetry
}

return jobRetryDecisionDiscard, time.Time{}
}
78 changes: 51 additions & 27 deletions internal/maintenance/job_rescuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,22 @@ import (
// callbackWorkUnitFactory wraps a Worker to implement workUnitFactory.
type callbackWorkUnitFactory struct {
Callback func(ctx context.Context, jobRow *rivertype.JobRow) error
timeout time.Duration // defaults to 0, which signals default timeout
}

func (w *callbackWorkUnitFactory) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow}
return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow, timeout: w.timeout}
}

// callbackWorkUnit implements workUnit for a job and Worker.
type callbackWorkUnit struct {
callback func(ctx context.Context, jobRow *rivertype.JobRow) error
jobRow *rivertype.JobRow
timeout time.Duration // defaults to 0, which signals default timeout
}

func (w *callbackWorkUnit) NextRetry() time.Time { return time.Now().Add(30 * time.Second) }
func (w *callbackWorkUnit) Timeout() time.Duration { return 0 }
func (w *callbackWorkUnit) Timeout() time.Duration { return w.timeout }
func (w *callbackWorkUnit) Work(ctx context.Context) error { return w.callback(ctx, w.jobRow) }
func (w *callbackWorkUnit) UnmarshalJob() error { return nil }

Expand All @@ -51,10 +53,13 @@ func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
func TestJobRescuer(t *testing.T) {
t.Parallel()

const rescuerJobKind = "rescuer"

ctx := context.Background()

const (
rescuerJobKind = "rescuer"
rescuerJobKindLongTimeout = "rescuer_long_timeout"
)

type testBundle struct {
exec riverdriver.Executor
rescueHorizon time.Time
Expand All @@ -76,8 +81,13 @@ func TestJobRescuer(t *testing.T) {
Interval: JobRescuerIntervalDefault,
RescueAfter: JobRescuerRescueAfterDefault,
WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory {
if kind == rescuerJobKind {
return &callbackWorkUnitFactory{Callback: func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }}
emptyCallback := func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }

switch kind {
case rescuerJobKind:
return &callbackWorkUnitFactory{Callback: emptyCallback}
case rescuerJobKindLongTimeout:
return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute}
}
panic("unhandled kind: " + kind)
},
Expand Down Expand Up @@ -135,11 +145,18 @@ func TestJobRescuer(t *testing.T) {
stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)})
stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued

// these aren't touched:
// these aren't touched because they're in ineligible states
notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
notRunningJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateDiscarded), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})
notRunningJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCancelled), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)})

// Jobs with worker-specific long timeouts. The first isn't rescued
// because the difference between its `attempted_at` and now is still
// within the timeout threshold. The second _is_ rescued because it
// started earlier and even with the longer timeout, has still timed out.
longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})
longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)})

require.NoError(cleaner.Start(ctx))

cleaner.TestSignals.FetchedBatch.WaitOrTimeout()
Expand All @@ -158,37 +175,44 @@ func TestJobRescuer(t *testing.T) {
require.NoError(err)
require.Equal(stuckToRetryJob3.State, job3After.State) // not rescued

discard1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID)
discardJob1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID)
require.NoError(err)
require.Equal(rivertype.JobStateDiscarded, discard1After.State)
require.WithinDuration(time.Now(), *discard1After.FinalizedAt, 5*time.Second)
require.Len(discard1After.Errors, 1)
require.Equal(rivertype.JobStateDiscarded, discardJob1After.State)
require.WithinDuration(time.Now(), *discardJob1After.FinalizedAt, 5*time.Second)
require.Len(discardJob1After.Errors, 1)

discard2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID)
discardJob2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID)
require.NoError(err)
require.Equal(rivertype.JobStateRunning, discard2After.State)
require.Nil(discard2After.FinalizedAt)
require.Equal(rivertype.JobStateRunning, discardJob2After.State)
require.Nil(discardJob2After.FinalizedAt)

cancel1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID)
cancelJob1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID)
require.NoError(err)
require.Equal(rivertype.JobStateCancelled, cancel1After.State)
require.WithinDuration(time.Now(), *cancel1After.FinalizedAt, 5*time.Second)
require.Len(cancel1After.Errors, 1)
require.Equal(rivertype.JobStateCancelled, cancelJob1After.State)
require.WithinDuration(time.Now(), *cancelJob1After.FinalizedAt, 5*time.Second)
require.Len(cancelJob1After.Errors, 1)

cancel2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID)
cancelJob2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID)
require.NoError(err)
require.Equal(rivertype.JobStateRunning, cancel2After.State)
require.Nil(cancel2After.FinalizedAt)
require.Equal(rivertype.JobStateRunning, cancelJob2After.State)
require.Nil(cancelJob2After.FinalizedAt)

notRunning1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID)
notRunningJob1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID)
require.NoError(err)
require.Equal(notRunning1After.State, notRunningJob1.State)
notRunning2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID)
require.Equal(notRunningJob1.State, notRunningJob1After.State)
notRunningJob2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID)
require.NoError(err)
require.Equal(notRunningJob2.State, notRunningJob2After.State)
notRunningJob3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID)
require.NoError(err)
require.Equal(notRunningJob3.State, notRunningJob3After.State)

notTimedOutJob1After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob1.ID)
require.NoError(err)
require.Equal(notRunning2After.State, notRunningJob2.State)
notRunning3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID)
require.Equal(rivertype.JobStateRunning, notTimedOutJob1After.State)
notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob2.ID)
require.NoError(err)
require.Equal(notRunning3After.State, notRunningJob3.State)
require.Equal(rivertype.JobStateRetryable, notTimedOutJob2After.State)
})

t.Run("RescuesInBatches", func(t *testing.T) {
Expand Down

0 comments on commit bec5206

Please sign in to comment.