Skip to content

Commit

Permalink
new unique jobs implementation that works on bulk insert
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Sep 20, 2024
1 parent d48e600 commit 7426729
Show file tree
Hide file tree
Showing 30 changed files with 1,641 additions and 1,679 deletions.
43 changes: 32 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type Config struct {
// only 32 bits of number space for advisory lock hashes, so it makes
// internally conflicting River-generated keys more likely.
//
// Advisory locks are currently only used for the fallback/slow path of
// unique job insertion where finalized states are included in a ByState
// configuration.
// Advisory locks are currently only used for the deprecated fallback/slow
// path of unique job insertion when pending, scheduled, available, or running
// are omitted from a customized ByState configuration.
AdvisoryLockPrefix int32

// CancelledJobRetentionPeriod is the amount of time to keep cancelled jobs
Expand Down Expand Up @@ -339,7 +339,7 @@ type Client[TTx any] struct {
stopped <-chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
uniqueInserter *dbunique.UniqueInserter // deprecated fallback path for unique job insertion

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -1227,6 +1227,17 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
State: rivertype.JobStateAvailable,
Tags: tags,
}
var returnUniqueOpts *dbunique.UniqueOpts
if !uniqueOpts.isEmpty() {
if uniqueOpts.isV1() {
// TODO: block this path if we're within a multirow insert.
returnUniqueOpts = (*dbunique.UniqueOpts)(&uniqueOpts)
} else {
internalUniqueOpts := (*dbunique.UniqueOpts)(&uniqueOpts)
insertParams.UniqueKey = dbunique.UniqueKey(archetype.Time, internalUniqueOpts, insertParams)
insertParams.UniqueStates = internalUniqueOpts.StateBitmask()
}
}

switch {
case !insertOpts.ScheduledAt.IsZero():
Expand All @@ -1245,7 +1256,7 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf
insertParams.State = rivertype.JobStatePending
}

return insertParams, (*dbunique.UniqueOpts)(&uniqueOpts), nil
return insertParams, returnUniqueOpts, nil
}

var errNoDriverDBPool = errors.New("driver must have non-nil database pool to use non-transactional methods like Insert and InsertMany (try InsertTx or InsertManyTx instead")
Expand Down Expand Up @@ -1305,9 +1316,19 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
}
defer tx.Rollback(ctx)

jobInsertRes, err := c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
// TODO: consolidate insertion paths for single + multi, remove deprecated uniqueness design
var jobInsertRes *riverdriver.JobInsertFastResult
if uniqueOpts == nil {
jobInsertRes, err = tx.JobInsertFast(ctx, params)
if err != nil {
return nil, err
}
} else {
// Old deprecated advisory lock route
jobInsertRes, err = c.uniqueInserter.JobInsert(ctx, tx, params, uniqueOpts)
if err != nil {
return nil, err
}
}

if err := c.maybeNotifyInsert(ctx, tx, params.State, params.Queue); err != nil {
Expand All @@ -1317,7 +1338,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

return jobInsertRes, nil
return (*rivertype.JobInsertResult)(jobInsertRes), nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1423,8 +1444,8 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
func(result *riverdriver.JobInsertFastResult) *rivertype.JobInsertResult {
return (*rivertype.JobInsertResult)(result)
},
), nil
}
Expand Down
148 changes: 133 additions & 15 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/internal/dbunique"
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
Expand Down Expand Up @@ -4489,17 +4490,17 @@ func Test_Client_UnknownJobKindErrorsTheJob(t *testing.T) {

insertParams, _, err := insertParamsFromConfigArgsAndOptions(&client.baseService.Archetype, config, unregisteredJobArgs{}, nil)
require.NoError(err)
insertedJob, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
insertedResult, err := client.driver.GetExecutor().JobInsertFast(ctx, insertParams)
require.NoError(err)

event := riversharedtest.WaitOrTimeout(t, subscribeChan)
require.Equal(insertedJob.ID, event.Job.ID)
require.Equal("RandomWorkerNameThatIsNeverRegistered", insertedJob.Kind)
require.Equal(insertedResult.Job.ID, event.Job.ID)
require.Equal("RandomWorkerNameThatIsNeverRegistered", insertedResult.Job.Kind)
require.Len(event.Job.Errors, 1)
require.Equal((&UnknownJobKindError{Kind: "RandomWorkerNameThatIsNeverRegistered"}).Error(), event.Job.Errors[0].Error)
require.Equal(rivertype.JobStateRetryable, event.Job.State)
// Ensure that ScheduledAt was updated with next run time:
require.True(event.Job.ScheduledAt.After(insertedJob.ScheduledAt))
require.True(event.Job.ScheduledAt.After(insertedResult.Job.ScheduledAt))
// It's the 1st attempt that failed. Attempt won't be incremented again until
// the job gets fetched a 2nd time.
require.Equal(1, event.Job.Attempt)
Expand Down Expand Up @@ -5098,7 +5099,9 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
require.Nil(t, insertParams.ScheduledAt)
require.Equal(t, []string{}, insertParams.Tags)

require.True(t, uniqueOpts.IsEmpty())
require.Nil(t, uniqueOpts)
require.Empty(t, insertParams.UniqueKey)
require.Zero(t, insertParams.UniqueStates)
})

t.Run("ConfigOverrides", func(t *testing.T) {
Expand Down Expand Up @@ -5178,22 +5181,98 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
}
})

t.Run("UniqueOpts", func(t *testing.T) {
t.Run("UniqueOptsDefaultStates", func(t *testing.T) {
t.Parallel()

archetype := riversharedtest.BaseServiceArchetype(t)
archetype.Time.StubNowUTC(time.Now().UTC())

uniqueOpts := UniqueOpts{
ByArgs: true,
ByPeriod: 10 * time.Second,
ByQueue: true,
ExcludeKind: true,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
ByArgs: true,
ByPeriod: 10 * time.Second,
ByQueue: true,
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStatePending, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled},
ExcludeKind: true,
}

expectedKey := dbunique.UniqueKey(archetype.Time, internalUniqueOpts, params)

require.Equal(t, expectedKey, params.UniqueKey)
require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates)
})

t.Run("UniqueOptsCustomStates", func(t *testing.T) {
t.Parallel()

archetype := riversharedtest.BaseServiceArchetype(t)
archetype.Time.StubNowUTC(time.Now().UTC())

states := []rivertype.JobState{
rivertype.JobStateAvailable,
rivertype.JobStatePending,
rivertype.JobStateRetryable,
rivertype.JobStateRunning,
rivertype.JobStateScheduled,
}

uniqueOpts := UniqueOpts{
ByPeriod: 10 * time.Second,
ByQueue: true,
ByState: states,
}

params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Nil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
ByPeriod: 10 * time.Second,
ByQueue: true,
ByState: states,
}

expectedKey := dbunique.UniqueKey(archetype.Time, internalUniqueOpts, params)

require.Equal(t, expectedKey, params.UniqueKey)
require.Equal(t, internalUniqueOpts.StateBitmask(), params.UniqueStates)
})

t.Run("UniqueOptsV1", func(t *testing.T) {
t.Parallel()

archetype := riversharedtest.BaseServiceArchetype(t)
archetype.Time.StubNowUTC(time.Now().UTC())

uniqueOpts := UniqueOpts{
ByArgs: true,
ByPeriod: 10 * time.Second,
ByQueue: true,
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
// This list of custom states (without pending, scheduled, running, etc.) is only valid for v1 unique opts:
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}

_, internalUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
params, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(archetype, config, noOpArgs{}, &InsertOpts{UniqueOpts: uniqueOpts})
require.NoError(t, err)
require.Equal(t, uniqueOpts.ByArgs, internalUniqueOpts.ByArgs)
require.Equal(t, uniqueOpts.ByPeriod, internalUniqueOpts.ByPeriod)
require.Equal(t, uniqueOpts.ByQueue, internalUniqueOpts.ByQueue)
require.Equal(t, uniqueOpts.ByState, internalUniqueOpts.ByState)
require.NotNil(t, resultUniqueOpts)
internalUniqueOpts := &dbunique.UniqueOpts{
ByArgs: true,
ByPeriod: 10 * time.Second,
ByQueue: true,
ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted},
}
require.Equal(t, internalUniqueOpts, resultUniqueOpts)

require.Nil(t, params.UniqueKey)
require.Zero(t, params.UniqueStates)
})

t.Run("PriorityIsLimitedTo4", func(t *testing.T) {
Expand All @@ -5219,15 +5298,18 @@ func TestInsertParamsFromJobArgsAndOptions(t *testing.T) {
// Ensure that unique opts are validated. No need to be exhaustive here
// since we already have tests elsewhere for that. Just make sure validation
// is running.
insertParams, _, err := insertParamsFromConfigArgsAndOptions(
insertParams, resultUniqueOpts, err := insertParamsFromConfigArgsAndOptions(
archetype,
config,
noOpArgs{},
&InsertOpts{UniqueOpts: UniqueOpts{ByPeriod: 1 * time.Millisecond}},
)
require.EqualError(t, err, "JobUniqueOpts.ByPeriod should not be less than 1 second")
require.Nil(t, insertParams)
require.Nil(t, resultUniqueOpts)
})

// TODO NOW NEXT: validate unique opts for v1 unique opts w/ advisory lock and custom states:
}

func TestID(t *testing.T) {
Expand Down Expand Up @@ -5410,7 +5492,7 @@ func TestUniqueOpts(t *testing.T) {
// roughly in the middle of the hour and well clear of any period
// boundaries.
client.baseService.Time.StubNowUTC(
time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond),
time.Now().Truncate(1 * time.Hour).Add(37*time.Minute + 23*time.Second + 123*time.Millisecond).UTC(),
)

return client, &testBundle{}
Expand Down Expand Up @@ -5441,7 +5523,43 @@ func TestUniqueOpts(t *testing.T) {
require.Equal(t, insertRes0.Job.ID, insertRes1.Job.ID)
})

t.Run("UniqueByState", func(t *testing.T) {
t.Run("UniqueByCustomStates", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

uniqueOpts := UniqueOpts{
ByPeriod: 24 * time.Hour,
ByState: rivertype.JobStates(),
ByQueue: true,
}

insertRes0, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
UniqueOpts: uniqueOpts,
})
require.NoError(t, err)

insertRes1, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
UniqueOpts: uniqueOpts,
})
require.NoError(t, err)

// Expect the same job to come back because we deduplicate from the original.
require.Equal(t, insertRes0.Job.ID, insertRes1.Job.ID)

insertRes2, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{
// Use another queue so the job can be inserted:
Queue: "other",
UniqueOpts: uniqueOpts,
})
require.NoError(t, err)

// This job however is _not_ the same because it's inserted as
// `scheduled` which is outside the unique constraints.
require.NotEqual(t, insertRes0.Job.ID, insertRes2.Job.ID)
})

t.Run("UniqueV1ByCustomStates", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
Expand Down
21 changes: 11 additions & 10 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"runtime"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -233,19 +234,19 @@ func BenchmarkDriverRiverPgxV5Insert(b *testing.B) {
}
})

b.Run("InsertUnique", func(b *testing.B) {
b.Run("InsertFast_WithUnique", func(b *testing.B) {
_, bundle := setup(b)

for n := 0; n < b.N; n++ {
_, err := bundle.exec.JobInsertUnique(ctx, &riverdriver.JobInsertUniqueParams{
JobInsertFastParams: &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
},
_, err := bundle.exec.JobInsertFast(ctx, &riverdriver.JobInsertFastParams{
EncodedArgs: []byte(`{"encoded": "args"}`),
Kind: "test_kind",
MaxAttempts: rivercommon.MaxAttemptsDefault,
Priority: rivercommon.PriorityDefault,
Queue: rivercommon.QueueDefault,
State: rivertype.JobStateAvailable,
UniqueKey: []byte("test_unique_key_" + strconv.Itoa(n)),
UniqueStates: 0xFB,
})
require.NoError(b, err)
}
Expand Down
27 changes: 26 additions & 1 deletion insert_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type UniqueOpts struct {
// Unlike other unique options, ByState gets a default when it's not set for
// user convenience. The default is equivalent to:
//
// ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled}
// ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStatePending, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled}
//
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, won't count towards the
Expand All @@ -142,6 +142,11 @@ type UniqueOpts struct {
// lock and performs a look up before insertion. For best performance, it's
// recommended that the default set of states is used.
ByState []rivertype.JobState

// ExcludeKind indicates that the job kind should not be included in the
// uniqueness check. This is useful when you want to enforce uniqueness
// across all jobs regardless of kind.
ExcludeKind bool
}

// isEmpty returns true for an empty, uninitialized options struct.
Expand All @@ -157,6 +162,26 @@ func (o *UniqueOpts) isEmpty() bool {
o.ByState == nil
}

func (o *UniqueOpts) isV1() bool {
// TODO(bgentry): add tests as part of PR
requiredV3states := []rivertype.JobState{
rivertype.JobStatePending,
rivertype.JobStateScheduled,
rivertype.JobStateAvailable,
rivertype.JobStateRunning,
}
if len(o.ByState) == 0 {
return false
}

for _, state := range requiredV3states {
if !slices.Contains(o.ByState, state) {
return true
}
}
return false
}

var jobStateAll = rivertype.JobStates() //nolint:gochecknoglobals

func (o *UniqueOpts) validate() error {
Expand Down
Loading

0 comments on commit 7426729

Please sign in to comment.