From 2b9dd8373accff9f793b87b7707f3505d5c24639 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 26 Sep 2024 11:19:28 -0500 Subject: [PATCH] Fix unique scheduling issues (#619) PR #590 introduced a more flexible unique job mechanism that allowed for the unique states to be customized, all while still benefiting from the performance improvements of using a unique index rather than an advisory lock. The retryable state is included in the default list of states, but can be removed if the user doesn't want to prevent an erroring job from blocking duplicate inserts. However this created an issue: when trying to schedule a retryable job (move it to available) it could potentially have a conflict with a duplicate unique job. To handle this, special logic was added to try to deal with this scenario for unique jobs, moving the conflicting row to discarded rather than available. Unfortunately this logic had issues and was insufficiently tested. There were a couple specific scenarios that caused issues: A unique job that was being scheduled because it was either inserted as scheduled or had errored and become retryable would actually be considered a conflict with itself because the query didn't properly exclude the row being scheduled. Attempting to schedule two duplicate retryable unique jobs at the same time would lead to a unique conflict because there was no mechanism to prevent this. The query changes in this PR address both of the above cases along with test coverage. The increased complexity is unfortunate, and we're probably nearing the limit of what should be dealt with in a single SQL query. If this still isn't complete I'm more inclined to fix the issues by catching these conflicts at the application level, explicitly moving the conflicting row(s) to discarded, and trying again. This can be looped with a backoff or recursed to ensure that progress keeps being made as individual conflicts get resolved. But hopefully that won't be necessary. Fixes #618. --- CHANGELOG.md | 4 + .../riverdrivertest/riverdrivertest.go | 213 +++++++++++++++--- .../internal/dbsqlc/river_job.sql.go | 76 +++++-- .../riverpgxv5/internal/dbsqlc/river_job.sql | 76 +++++-- .../internal/dbsqlc/river_job.sql.go | 76 +++++-- 5 files changed, 342 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c6535146..830717d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617). +### Fixed + +- Unique job changes from v0.12.0 / [PR #590](https://github.com/riverqueue/river/pull/590) introduced a bug with scheduled or retryable unique jobs where they could be considered in conflict with themselves and moved to `discarded` by mistake. There was also a possibility of a broken job scheduler if duplicate `retryable` unique jobs were attempted to be scheduled at the same time. The job scheduling query was corrected to address these issues along with missing test coverage. [PR #619](https://github.com/riverqueue/river/pull/619). + ## [0.12.0] - 2024-09-23 ⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version: diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index a4cf1add..42695d33 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -10,9 +10,11 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" "golang.org/x/text/cases" "golang.org/x/text/language" + "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" @@ -1503,54 +1505,191 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobSchedule", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("BasicScheduling", func(t *testing.T) { + exec, _ := setup(ctx, t) - var ( - horizon = time.Now() - beforeHorizon = horizon.Add(-1 * time.Minute) - afterHorizon = horizon.Add(1 * time.Minute) - ) + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + afterHorizon = horizon.Add(1 * time.Minute) + ) + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) - job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + // States that aren't scheduled. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) - // States that aren't scheduled. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) + // Right state, but after horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - // Right state, but after horizon. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + // First two scheduled because of limit. + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 2, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 2) + + // And then job3 scheduled. + result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 2, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 1) - // First two scheduled because of limit. - result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: 2, - Now: horizon, + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State) + + updatedJob3, err := exec.JobGetByID(ctx, job3.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) }) - require.NoError(t, err) - require.Len(t, result, 2) - // And then job3 scheduled. - result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: 2, - Now: horizon, + t.Run("HandlesUniqueConflicts", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + defaultUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRetryable, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + // The default unique state list, minus retryable to allow for these conflicts: + nonRetryableUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-2"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + // job3 has no conflict (it's the only one with this key), so it should be + // scheduled. + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-3"), + UniqueStates: dbunique.UniqueStatesToBitmask(defaultUniqueStates), + }) + + // This one is a conflict with job1 because it's already running and has + // the same unique properties: + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + // This one is *not* a conflict with job2 because it's completed, which + // isn't in the unique states: + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateCompleted), + UniqueKey: []byte("unique-key-2"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 100, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 3) + + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob1.State) + require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").String()) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State) + require.False(t, gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").Exists()) + + updatedJob3, err := exec.JobGetByID(ctx, job3.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) + require.False(t, gjson.GetBytes(updatedJob3.Metadata, "unique_key_conflict").Exists()) }) - require.NoError(t, err) - require.Len(t, result, 1) - updatedJob1, err := exec.JobGetByID(ctx, job1.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + t.Run("SchedulingTwoRetryableJobsThatWillConflictWithEachOther", func(t *testing.T) { + t.Parallel() - updatedJob2, err := exec.JobGetByID(ctx, job2.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State) + exec, _ := setup(ctx, t) - updatedJob3, err := exec.JobGetByID(ctx, job3.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + // The default unique state list, minus retryable to allow for these conflicts: + nonRetryableUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 100, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 2) + + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + require.False(t, gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").Exists()) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State) + require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String()) + }) }) t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index d337907d..cb339c1a 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1013,7 +1013,12 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -1027,32 +1032,59 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT DISTINCT unique_key +jobs_with_rownum AS ( + SELECT + id, unique_key, unique_states, priority, scheduled_at, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job - WHERE unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id + WHERE + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE $1::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN $1::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 7703c15a..c1ac0bdf 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -406,7 +406,12 @@ FROM updated_job; -- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -420,32 +425,59 @@ WITH jobs_to_schedule AS ( LIMIT @max::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT DISTINCT unique_key +jobs_with_rownum AS ( + SELECT + *, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job - WHERE unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id + WHERE + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE @now::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN @now::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT sqlc.embed(river_job), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 00317e53..1fff4859 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -997,7 +997,12 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -1011,32 +1016,59 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT DISTINCT unique_key +jobs_with_rownum AS ( + SELECT + id, unique_key, unique_states, priority, scheduled_at, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job - WHERE unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id + WHERE + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE $1::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN $1::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states,