diff --git a/CHANGELOG.md b/CHANGELOG.md index c6535146..830717d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617). +### Fixed + +- Unique job changes from v0.12.0 / [PR #590](https://github.com/riverqueue/river/pull/590) introduced a bug with scheduled or retryable unique jobs where they could be considered in conflict with themselves and moved to `discarded` by mistake. There was also a possibility of a broken job scheduler if duplicate `retryable` unique jobs were attempted to be scheduled at the same time. The job scheduling query was corrected to address these issues along with missing test coverage. [PR #619](https://github.com/riverqueue/river/pull/619). + ## [0.12.0] - 2024-09-23 ⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version: diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index a4cf1add..42695d33 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -10,9 +10,11 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" "golang.org/x/text/cases" "golang.org/x/text/language" + "github.com/riverqueue/river/internal/dbunique" "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" @@ -1503,54 +1505,191 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobSchedule", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("BasicScheduling", func(t *testing.T) { + exec, _ := setup(ctx, t) - var ( - horizon = time.Now() - beforeHorizon = horizon.Add(-1 * time.Minute) - afterHorizon = horizon.Add(1 * time.Minute) - ) + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + afterHorizon = horizon.Add(1 * time.Minute) + ) + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) - job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + // States that aren't scheduled. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) - // States that aren't scheduled. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateAvailable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateCompleted)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{FinalizedAt: &beforeHorizon, ScheduledAt: &beforeHorizon, State: ptrutil.Ptr(rivertype.JobStateDiscarded)}) + // Right state, but after horizon. + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) - // Right state, but after horizon. - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateRetryable)}) - _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ScheduledAt: &afterHorizon, State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + // First two scheduled because of limit. + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 2, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 2) + + // And then job3 scheduled. + result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 2, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 1) - // First two scheduled because of limit. - result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: 2, - Now: horizon, + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State) + + updatedJob3, err := exec.JobGetByID(ctx, job3.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) }) - require.NoError(t, err) - require.Len(t, result, 2) - // And then job3 scheduled. - result, err = exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ - Max: 2, - Now: horizon, + t.Run("HandlesUniqueConflicts", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + defaultUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRetryable, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + // The default unique state list, minus retryable to allow for these conflicts: + nonRetryableUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-2"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + // job3 has no conflict (it's the only one with this key), so it should be + // scheduled. + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-3"), + UniqueStates: dbunique.UniqueStatesToBitmask(defaultUniqueStates), + }) + + // This one is a conflict with job1 because it's already running and has + // the same unique properties: + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + // This one is *not* a conflict with job2 because it's completed, which + // isn't in the unique states: + _ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateCompleted), + UniqueKey: []byte("unique-key-2"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 100, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 3) + + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob1.State) + require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").String()) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State) + require.False(t, gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").Exists()) + + updatedJob3, err := exec.JobGetByID(ctx, job3.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) + require.False(t, gjson.GetBytes(updatedJob3.Metadata, "unique_key_conflict").Exists()) }) - require.NoError(t, err) - require.Len(t, result, 1) - updatedJob1, err := exec.JobGetByID(ctx, job1.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + t.Run("SchedulingTwoRetryableJobsThatWillConflictWithEachOther", func(t *testing.T) { + t.Parallel() - updatedJob2, err := exec.JobGetByID(ctx, job2.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateAvailable, updatedJob2.State) + exec, _ := setup(ctx, t) - updatedJob3, err := exec.JobGetByID(ctx, job3.ID) - require.NoError(t, err) - require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) + var ( + horizon = time.Now() + beforeHorizon = horizon.Add(-1 * time.Minute) + ) + + // The default unique state list, minus retryable to allow for these conflicts: + nonRetryableUniqueStates := []rivertype.JobState{ + rivertype.JobStateAvailable, + rivertype.JobStatePending, + rivertype.JobStateRunning, + rivertype.JobStateScheduled, + } + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + ScheduledAt: &beforeHorizon, + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key-1"), + UniqueStates: dbunique.UniqueStatesToBitmask(nonRetryableUniqueStates), + }) + + result, err := exec.JobSchedule(ctx, &riverdriver.JobScheduleParams{ + Max: 100, + Now: horizon, + }) + require.NoError(t, err) + require.Len(t, result, 2) + + updatedJob1, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, updatedJob1.State) + require.False(t, gjson.GetBytes(updatedJob1.Metadata, "unique_key_conflict").Exists()) + + updatedJob2, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob2.State) + require.Equal(t, "scheduler_discarded", gjson.GetBytes(updatedJob2.Metadata, "unique_key_conflict").String()) + }) }) t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) { diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index d337907d..cb339c1a 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1013,7 +1013,12 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -1027,32 +1032,59 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT DISTINCT unique_key +jobs_with_rownum AS ( + SELECT + id, unique_key, unique_states, priority, scheduled_at, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job - WHERE unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id + WHERE + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE $1::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN $1::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 7703c15a..c1ac0bdf 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -406,7 +406,12 @@ FROM updated_job; -- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -420,32 +425,59 @@ WITH jobs_to_schedule AS ( LIMIT @max::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT DISTINCT unique_key +jobs_with_rownum AS ( + SELECT + *, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job - WHERE unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id + WHERE + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE @now::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN @now::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT sqlc.embed(river_job), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 00317e53..1fff4859 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -997,7 +997,12 @@ func (q *Queries) JobRetry(ctx context.Context, db DBTX, id int64) (*RiverJob, e const jobSchedule = `-- name: JobSchedule :many WITH jobs_to_schedule AS ( - SELECT id, unique_key, unique_states + SELECT + id, + unique_key, + unique_states, + priority, + scheduled_at FROM river_job WHERE state IN ('retryable', 'scheduled') @@ -1011,32 +1016,59 @@ WITH jobs_to_schedule AS ( LIMIT $2::bigint FOR UPDATE ), -conflicting_jobs AS ( - SELECT DISTINCT unique_key +jobs_with_rownum AS ( + SELECT + id, unique_key, unique_states, priority, scheduled_at, + CASE + WHEN unique_key IS NOT NULL AND unique_states IS NOT NULL THEN + ROW_NUMBER() OVER ( + PARTITION BY unique_key + ORDER BY priority, scheduled_at, id + ) + ELSE NULL + END AS row_num + FROM jobs_to_schedule +), +unique_conflicts AS ( + SELECT river_job.unique_key FROM river_job - WHERE unique_key IN ( - SELECT unique_key - FROM jobs_to_schedule - WHERE unique_key IS NOT NULL - AND unique_states IS NOT NULL - ) - AND unique_states IS NOT NULL - AND river_job_state_in_bitmask(unique_states, state) + JOIN jobs_with_rownum + ON river_job.unique_key = jobs_with_rownum.unique_key + AND river_job.id != jobs_with_rownum.id + WHERE + river_job.unique_key IS NOT NULL + AND river_job.unique_states IS NOT NULL + AND river_job_state_in_bitmask(river_job.unique_states, river_job.state) +), +job_updates AS ( + SELECT + job.id, + job.unique_key, + job.unique_states, + CASE + WHEN job.row_num IS NULL THEN 'available'::river_job_state + WHEN uc.unique_key IS NOT NULL THEN 'discarded'::river_job_state + WHEN job.row_num = 1 THEN 'available'::river_job_state + ELSE 'discarded'::river_job_state + END AS new_state, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS finalized_at_do_update, + (job.row_num IS NOT NULL AND (uc.unique_key IS NOT NULL OR job.row_num > 1)) AS metadata_do_update + FROM jobs_with_rownum job + LEFT JOIN unique_conflicts uc ON job.unique_key = uc.unique_key ), updated_jobs AS ( UPDATE river_job SET - state = CASE WHEN cj.unique_key IS NULL THEN 'available'::river_job_state - ELSE 'discarded'::river_job_state END, - finalized_at = CASE WHEN cj.unique_key IS NULL THEN finalized_at - ELSE $1::timestamptz END, - -- Purely for debugging to understand when this code path was used: - metadata = CASE WHEN cj.unique_key IS NULL THEN metadata - ELSE metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb END - FROM jobs_to_schedule jts - LEFT JOIN conflicting_jobs cj ON jts.unique_key = cj.unique_key - WHERE river_job.id = jts.id - RETURNING river_job.id, state = 'discarded'::river_job_state AS conflict_discarded + state = job_updates.new_state, + finalized_at = CASE WHEN job_updates.finalized_at_do_update THEN $1::timestamptz + ELSE river_job.finalized_at END, + metadata = CASE WHEN job_updates.metadata_do_update THEN river_job.metadata || '{"unique_key_conflict": "scheduler_discarded"}'::jsonb + ELSE river_job.metadata END + FROM job_updates + WHERE river_job.id = job_updates.id + RETURNING + river_job.id, + job_updates.new_state = 'discarded'::river_job_state AS conflict_discarded ) SELECT river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states,