Skip to content

Commit

Permalink
handle unique conflicts from scheduler by discarding jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 20, 2024
1 parent dc9d60d commit 32fe194
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 84 deletions.
12 changes: 6 additions & 6 deletions internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,28 +151,28 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
now := s.Time.NowUTC()
nowWithLookAhead := now.Add(s.config.Interval)

scheduledJobs, err := tx.JobSchedule(ctx, &riverdriver.JobScheduleParams{
scheduledJobResults, err := tx.JobSchedule(ctx, &riverdriver.JobScheduleParams{
Max: s.config.Limit,
Now: nowWithLookAhead,
})
if err != nil {
return 0, fmt.Errorf("error scheduling jobs: %w", err)
}

queues := make([]string, 0, len(scheduledJobs))
queues := make([]string, 0, len(scheduledJobResults))

// Notify about scheduled jobs with a scheduled_at in the past, or just
// slightly in the future (this loop, the notify, and tx commit will take
// a small amount of time). This isn't going to be perfect, but the goal
// is to roughly try to guess when the clients will attempt to fetch jobs.
notificationHorizon := s.Time.NowUTC().Add(5 * time.Millisecond)

for _, job := range scheduledJobs {
if job.ScheduledAt.After(notificationHorizon) {
for _, result := range scheduledJobResults {
if result.Job.ScheduledAt.After(notificationHorizon) {
continue
}

queues = append(queues, job.Queue)
queues = append(queues, result.Job.Queue)
}

if len(queues) > 0 {
Expand All @@ -182,7 +182,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
s.TestSignals.NotifiedQueues.Signal(queues)
}

return len(scheduledJobs), tx.Commit(ctx)
return len(scheduledJobResults), tx.Commit(ctx)
}()
if err != nil {
return nil, err
Expand Down
53 changes: 53 additions & 0 deletions internal/maintenance/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/riverinternaltest"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
Expand Down Expand Up @@ -77,6 +78,14 @@ func TestJobScheduler(t *testing.T) {
require.Equal(t, rivertype.JobStateAvailable, newJob.State)
return newJob
}
requireJobStateDiscarded := func(t *testing.T, exec riverdriver.Executor, job *rivertype.JobRow) *rivertype.JobRow {
t.Helper()
newJob, err := exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, rivertype.JobStateDiscarded, newJob.State)
require.NotNil(t, newJob.FinalizedAt)
return newJob
}

t.Run("Defaults", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -139,6 +148,50 @@ func TestJobScheduler(t *testing.T) {
requireJobStateUnchanged(t, bundle.exec, retryableJob3) // still retryable
})

t.Run("MovesUniqueKeyConflictingJobsToDiscarded", func(t *testing.T) {
t.Parallel()

scheduler, bundle := setupTx(t)
now := time.Now().UTC()

// The list of default states, but without retryable to allow for dupes in that state:
uniqueStates := []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStateCompleted,
rivertype.JobStatePending,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}
uniqueMap := dbunique.UniqueStatesToBitmask(uniqueStates)

retryableJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("1"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-1 * time.Hour))})
retryableJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("2"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
retryableJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("3"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe
retryableJob4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("4"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe
retryableJob5 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("5"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe
retryableJob6 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("6"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe
retryableJob7 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("7"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRetryable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))}) // dupe

// Will cause conflicts with above jobs when retried:
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("3"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateAvailable)})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("4"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateCompleted)})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("5"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStatePending)})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("6"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateRunning)})
testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{UniqueKey: []byte("7"), UniqueStates: uniqueMap, State: ptrutil.Ptr(rivertype.JobStateScheduled)})

require.NoError(t, scheduler.Start(ctx))

scheduler.TestSignals.ScheduledBatch.WaitOrTimeout()

requireJobStateAvailable(t, bundle.exec, retryableJob1)
requireJobStateAvailable(t, bundle.exec, retryableJob2)
requireJobStateDiscarded(t, bundle.exec, retryableJob3)
requireJobStateDiscarded(t, bundle.exec, retryableJob4)
requireJobStateDiscarded(t, bundle.exec, retryableJob5)
requireJobStateDiscarded(t, bundle.exec, retryableJob6)
requireJobStateDiscarded(t, bundle.exec, retryableJob7)
})

t.Run("SchedulesInBatches", func(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 3 additions & 3 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type Executor interface {
JobListFields() string
JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error)
JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error)
JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*rivertype.JobRow, error)
JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error)
JobSetCompleteIfRunningMany(ctx context.Context, params *JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error)
JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*rivertype.JobRow, error)
JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error)
Expand Down Expand Up @@ -304,8 +304,8 @@ type JobScheduleParams struct {
}

type JobScheduleResult struct {
Queue string
ScheduledAt time.Time
Job rivertype.JobRow
ConflictDiscarded bool
}

// JobSetCompleteIfRunningManyParams are parameters to set many running jobs to
Expand Down
85 changes: 55 additions & 30 deletions riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions riverdriver/riverdatabasesql/river_database_sql_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,15 +524,21 @@ func (e *Executor) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, e
return jobRowFromInternal(job)
}

func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*rivertype.JobRow, error) {
jobs, err := dbsqlc.New().JobSchedule(ctx, e.dbtx, &dbsqlc.JobScheduleParams{
func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobScheduleParams) ([]*riverdriver.JobScheduleResult, error) {
scheduleResults, err := dbsqlc.New().JobSchedule(ctx, e.dbtx, &dbsqlc.JobScheduleParams{
Max: int64(params.Max),
Now: params.Now,
})
if err != nil {
return nil, interpretError(err)
}
return mapSliceError(jobs, jobRowFromInternal)
return mapSliceError(scheduleResults, func(result *dbsqlc.JobScheduleRow) (*riverdriver.JobScheduleResult, error) {
job, err := jobRowFromInternal(&result.RiverJob)
if err != nil {
return nil, err
}
return &riverdriver.JobScheduleResult{ConflictDiscarded: result.ConflictDiscarded, Job: *job}, nil
})
}

func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) {
Expand Down
37 changes: 28 additions & 9 deletions riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql
Original file line number Diff line number Diff line change
Expand Up @@ -406,30 +406,49 @@ FROM updated_job;

-- name: JobSchedule :many
WITH jobs_to_schedule AS (
SELECT id
SELECT id, unique_key, unique_states
FROM river_job
WHERE
state IN ('retryable', 'scheduled')
AND queue IS NOT NULL
AND priority >= 0
AND river_job.scheduled_at <= @now::timestamptz
AND scheduled_at <= @now::timestamptz
ORDER BY
priority,
scheduled_at,
id
LIMIT @max::bigint
FOR UPDATE
),
river_job_scheduled AS (
conflicting_jobs AS (
SELECT DISTINCT unique_key
FROM river_job
WHERE unique_key IN (
SELECT unique_key
FROM jobs_to_schedule
WHERE unique_key IS NOT NULL
AND unique_states IS NOT NULL
)
AND unique_states IS NOT NULL
AND river_job_state_in_bitmask(unique_states, state)
),
updated_jobs AS (
UPDATE river_job
SET state = 'available'
FROM jobs_to_schedule
WHERE river_job.id = jobs_to_schedule.id
RETURNING river_job.id
SET
state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state
ELSE 'discarded'::river_job_state END,
finalized_at = CASE WHEN cj.unique_key IS NOT NULL THEN @now::timestamptz
ELSE finalized_at END
FROM jobs_to_schedule jts
LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key
WHERE river_job.id = jts.id
RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded
)
SELECT *
SELECT
sqlc.embed(river_job),
updated_jobs.conflict_discarded
FROM river_job
WHERE id IN (SELECT id FROM river_job_scheduled);
JOIN updated_jobs ON river_job.id = updated_jobs.id;

-- name: JobSetCompleteIfRunningMany :many
WITH job_to_finalized_at AS (
Expand Down
Loading

0 comments on commit 32fe194

Please sign in to comment.