From 6bb7e88c25dc9d53e457422ff22a62f659295363 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 10 Sep 2024 12:17:53 -0500 Subject: [PATCH 1/4] add JobInsertManyReturning to drivers This new query allows many rows to be inserted (up to ~7280 as of now) while also allowing the new records to be returned. While this is not quite as fast as the `COPY FROM` option when loading many rows, it provides a better UX for the vast majority of use cases. It _does_ require that we ditch sqlc for this one query, because sqlc does not support the multirow values insert syntax due to the dynamic nature of the param placeholders. --- .../riverdrivertest/riverdrivertest.go | 79 +++++++++++++++- riverdriver/river_driver_interface.go | 1 + .../internal/dbsqlc/river_job.sql.go | 92 +++++++++++++++++++ .../river_database_sql_driver.go | 48 ++++++++++ .../riverpgxv5/internal/dbsqlc/river_job.sql | 28 ++++++ .../internal/dbsqlc/river_job.sql.go | 89 ++++++++++++++++++ riverdriver/riverpgxv5/river_pgx_v5_driver.go | 48 ++++++++++ rivershared/util/sliceutil/slice_util.go | 9 ++ rivershared/util/sliceutil/slice_util_test.go | 15 +++ 9 files changed, 408 insertions(+), 1 deletion(-) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 55ef1b91..06f30c52 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -16,7 +16,7 @@ import ( "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" - "github.com/riverqueue/river/rivershared/testfactory" //nolint:depguard + "github.com/riverqueue/river/rivershared/testfactory" "github.com/riverqueue/river/rivershared/util/ptrutil" "github.com/riverqueue/river/rivershared/util/sliceutil" "github.com/riverqueue/river/rivertype" @@ -1152,6 +1152,83 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + t.Run("JobInsertManyReturning", func(t *testing.T) { + t.Parallel() + + t.Run("AllArgs", func(t *testing.T) { + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: ptrutil.Ptr(now.Add(time.Duration(i) * time.Minute)), + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + jobRows, err := exec.JobInsertManyReturning(ctx, insertParams) + require.NoError(t, err) + require.Len(t, jobRows, len(insertParams)) + + for i, job := range jobRows { + require.Equal(t, 0, job.Attempt) + require.Nil(t, job.AttemptedAt) + require.Empty(t, job.AttemptedBy) + require.WithinDuration(t, now, job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) + require.Empty(t, job.Errors) + require.Nil(t, job.FinalizedAt) + require.Equal(t, "test_kind", job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, job.Priority) + require.Equal(t, rivercommon.QueueDefault, job.Queue) + requireEqualTime(t, now.Add(time.Duration(i)*time.Minute), job.ScheduledAt) + require.Equal(t, rivertype.JobStateAvailable, job.State) + require.Equal(t, []string{"tag"}, job.Tags) + } + }) + + t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) { + exec, _ := setup(ctx, t) + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, // explicit nil + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + results, err := exec.JobInsertManyReturning(ctx, insertParams) + require.NoError(t, err) + require.Len(t, results, len(insertParams)) + + jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) + require.NoError(t, err) + require.Len(t, jobsAfter, len(insertParams)) + for _, job := range jobsAfter { + require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) + } + }) + }) + t.Run("JobInsertUnique", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index d47b3eee..8609b450 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -118,6 +118,7 @@ type Executor interface { JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error) JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) + JobInsertManyReturning(ctx context.Context, params []*JobInsertFastParams) ([]*rivertype.JobRow, error) JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) JobListFields() string diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 61eebd1d..81de9241 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -745,6 +745,98 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } +const jobInsertManyReturning = `-- name: JobInsertManyReturning :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertManyReturningParams struct { + Args []string + Kind []string + MaxAttempts []int16 + Metadata []string + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertManyReturning(ctx context.Context, db DBTX, arg *JobInsertManyReturningParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobInsertManyReturning, + pq.Array(arg.Args), + pq.Array(arg.Kind), + pq.Array(arg.MaxAttempts), + pq.Array(arg.Metadata), + pq.Array(arg.Priority), + pq.Array(arg.Queue), + pq.Array(arg.ScheduledAt), + pq.Array(arg.State), + pq.Array(arg.Tags), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobInsertUnique = `-- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index ba7599ab..7f5d0fa1 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -286,6 +286,54 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { + insertJobsParams := &dbsqlc.JobInsertManyReturningParams{ + Args: make([]string, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([]string, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := "{}" + + insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ Args: string(params.EncodedArgs), diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index ff3decf2..df20035d 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -285,6 +285,34 @@ INSERT INTO river_job( @unique_key ) RETURNING *; +-- name: JobInsertManyReturning :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest(@args::jsonb[]), + unnest(@kind::text[]), + unnest(@max_attempts::smallint[]), + unnest(@metadata::jsonb[]), + unnest(@priority::smallint[]), + unnest(@queue::text[]), + unnest(@scheduled_at::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest(@state::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest(@tags::text[]), ',') +RETURNING *; + -- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 5beabbe6..55138fe0 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -731,6 +731,95 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } +const jobInsertManyReturning = `-- name: JobInsertManyReturning :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertManyReturningParams struct { + Args [][]byte + Kind []string + MaxAttempts []int16 + Metadata [][]byte + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertManyReturning(ctx context.Context, db DBTX, arg *JobInsertManyReturningParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobInsertManyReturning, + arg.Args, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + arg.Tags, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobInsertUnique = `-- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 36da4d81..b60e9671 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -278,6 +278,54 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } +func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { + insertJobsParams := &dbsqlc.JobInsertManyReturningParams{ + Args: make([][]byte, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([][]byte, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := []byte("{}") + + insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ Args: params.EncodedArgs, diff --git a/rivershared/util/sliceutil/slice_util.go b/rivershared/util/sliceutil/slice_util.go index 400c9d40..8088e05a 100644 --- a/rivershared/util/sliceutil/slice_util.go +++ b/rivershared/util/sliceutil/slice_util.go @@ -4,6 +4,15 @@ // therefore omitted from the utilities in `slices`. package sliceutil +// DefaultIfEmpty returns the default slice if the input slice is nil or empty, +// otherwise it returns the input slice. +func DefaultIfEmpty[T any](input []T, defaultSlice []T) []T { + if len(input) == 0 { + return defaultSlice + } + return input +} + // GroupBy returns an object composed of keys generated from the results of // running each element of collection through keyFunc. func GroupBy[T any, U comparable](collection []T, keyFunc func(T) U) map[U][]T { diff --git a/rivershared/util/sliceutil/slice_util_test.go b/rivershared/util/sliceutil/slice_util_test.go index 4b2bc49c..4eb57e56 100644 --- a/rivershared/util/sliceutil/slice_util_test.go +++ b/rivershared/util/sliceutil/slice_util_test.go @@ -8,6 +8,21 @@ import ( "github.com/stretchr/testify/require" ) +func TestDefaultIfEmpty(t *testing.T) { + t.Parallel() + + result1 := DefaultIfEmpty([]int{1, 2, 3}, []int{4, 5, 6}) + result2 := DefaultIfEmpty([]int{}, []int{4, 5, 6}) + result3 := DefaultIfEmpty(nil, []int{4, 5, 6}) + + require.Len(t, result1, 3) + require.Len(t, result2, 3) + require.Len(t, result3, 3) + require.Equal(t, []int{1, 2, 3}, result1) + require.Equal(t, []int{4, 5, 6}, result2) + require.Equal(t, []int{4, 5, 6}, result3) +} + func TestGroupBy(t *testing.T) { t.Parallel() From f93246edb194744e8389828e39e179047ae94661 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 10 Sep 2024 13:14:41 -0500 Subject: [PATCH 2/4] rename InsertMany to InsertManyFast --- client.go | 18 ++++++++-------- client_test.go | 42 ++++++++++++++++++------------------ example_batch_insert_test.go | 2 +- rivertest/rivertest_test.go | 14 ++++++------ 4 files changed, 38 insertions(+), 38 deletions(-) diff --git a/client.go b/client.go index 4aa69df9..a73d35fd 100644 --- a/client.go +++ b/client.go @@ -1326,7 +1326,7 @@ type InsertManyParams struct { InsertOpts *InsertOpts } -// InsertMany inserts many jobs at once using Postgres' `COPY FROM` mechanism, +// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, // making the operation quite fast and memory efficient. Each job is inserted as // an InsertManyParams tuple, which takes job args along with an optional set of // insert options, which override insert options provided by an @@ -1345,12 +1345,12 @@ type InsertManyParams struct { // Job uniqueness is not respected when using InsertMany due to unique inserts // using an internal transaction and advisory lock that might lead to // significant lock contention. Insert unique jobs using Insert instead. -func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int, error) { +func (c *Client[TTx]) InsertManyFast(ctx context.Context, params []InsertManyParams) (int, error) { if !c.driver.HasPool() { return 0, errNoDriverDBPool } - insertParams, err := c.insertManyParams(params) + insertParams, err := c.insertManyFastParams(params) if err != nil { return 0, err } @@ -1362,7 +1362,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) } defer tx.Rollback(ctx) - inserted, err := c.insertFastMany(ctx, tx, insertParams) + inserted, err := c.insertManyFast(ctx, tx, insertParams) if err != nil { return 0, err } @@ -1395,17 +1395,17 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) // This variant lets a caller insert jobs atomically alongside other database // changes. An inserted job isn't visible to be worked until the transaction // commits, and if the transaction rolls back, so too is the inserted job. -func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) { - insertParams, err := c.insertManyParams(params) +func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []InsertManyParams) (int, error) { + insertParams, err := c.insertManyFastParams(params) if err != nil { return 0, err } exec := c.driver.UnwrapExecutor(tx) - return c.insertFastMany(ctx, exec, insertParams) + return c.insertManyFast(ctx, exec, insertParams) } -func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { +func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { inserted, err := tx.JobInsertFastMany(ctx, insertParams) if err != nil { return inserted, err @@ -1425,7 +1425,7 @@ func (c *Client[TTx]) insertFastMany(ctx context.Context, tx riverdriver.Executo // Validates input parameters for an a batch insert operation and generates a // set of batch insert parameters. -func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { +func (c *Client[TTx]) insertManyFastParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { if len(params) < 1 { return nil, errors.New("no jobs to insert") } diff --git a/client_test.go b/client_test.go index ff7c1e80..993da3b0 100644 --- a/client_test.go +++ b/client_test.go @@ -1568,7 +1568,7 @@ func Test_Client_InsertTx(t *testing.T) { }) } -func Test_Client_InsertMany(t *testing.T) { +func Test_Client_InsertManyFast(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1592,7 +1592,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}}, {Args: noOpArgs{}}, }) @@ -1627,7 +1627,7 @@ func Test_Client_InsertMany(t *testing.T) { startClient(ctx, t, client) riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: callbackArgs{}}, {Args: callbackArgs{}}, }) @@ -1643,7 +1643,7 @@ func Test_Client_InsertMany(t *testing.T) { // // Note: we specifically use a different queue to ensure that the notify // limiter is immediately to fire on this queue. - count, err = client.InsertMany(ctx, []InsertManyParams{ + count, err = client.InsertManyFast(ctx, []InsertManyParams{ {Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}}, }) require.NoError(t, err) @@ -1675,7 +1675,7 @@ func Test_Client_InsertMany(t *testing.T) { startClient(ctx, t, client) riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}}, {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}}, }) @@ -1695,7 +1695,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}}, }) require.NoError(t, err) @@ -1713,7 +1713,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}}, }) require.ErrorContains(t, err, "queue name is invalid") @@ -1730,7 +1730,7 @@ func Test_Client_InsertMany(t *testing.T) { }) require.NoError(t, err) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}}, }) require.ErrorIs(t, err, errNoDriverDBPool) @@ -1742,7 +1742,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{}) + count, err := client.InsertManyFast(ctx, []InsertManyParams{}) require.EqualError(t, err, "no jobs to insert") require.Equal(t, 0, count) }) @@ -1752,7 +1752,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) var unknownJobKindErr *UnknownJobKindError @@ -1768,7 +1768,7 @@ func Test_Client_InsertMany(t *testing.T) { client.config.Workers = nil - _, err := client.InsertMany(ctx, []InsertManyParams{ + _, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) require.NoError(t, err) @@ -1779,7 +1779,7 @@ func Test_Client_InsertMany(t *testing.T) { client, _ := setup(t) - count, err := client.InsertMany(ctx, []InsertManyParams{ + count, err := client.InsertManyFast(ctx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, }) require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") @@ -1787,7 +1787,7 @@ func Test_Client_InsertMany(t *testing.T) { }) } -func Test_Client_InsertManyTx(t *testing.T) { +func Test_Client_InsertManyFastTx(t *testing.T) { t.Parallel() ctx := context.Background() @@ -1817,7 +1817,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "foo", Priority: 2}}, {Args: noOpArgs{}}, }) @@ -1841,7 +1841,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - _, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) + _, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) require.NoError(t, err) insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) @@ -1858,7 +1858,7 @@ func Test_Client_InsertManyTx(t *testing.T) { startClient(ctx, t, client) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) require.NoError(t, err) require.Equal(t, 1, count) @@ -1881,7 +1881,7 @@ func Test_Client_InsertManyTx(t *testing.T) { }) require.NoError(t, err) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}}, }) require.NoError(t, err) @@ -1893,7 +1893,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{}) + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{}) require.EqualError(t, err, "no jobs to insert") require.Equal(t, 0, count) }) @@ -1903,7 +1903,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) var unknownJobKindErr *UnknownJobKindError @@ -1919,7 +1919,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client.config.Workers = nil - _, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + _, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: unregisteredJobArgs{}}, }) require.NoError(t, err) @@ -1930,7 +1930,7 @@ func Test_Client_InsertManyTx(t *testing.T) { client, bundle := setup(t) - count, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + count, err := client.InsertManyFastTx(ctx, bundle.tx, []InsertManyParams{ {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, }) require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") diff --git a/example_batch_insert_test.go b/example_batch_insert_test.go index a4d3f817..a3b8274d 100644 --- a/example_batch_insert_test.go +++ b/example_batch_insert_test.go @@ -67,7 +67,7 @@ func Example_batchInsert() { panic(err) } - count, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ + count, err := riverClient.InsertManyFast(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index c51a59db..7201ed0a 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -168,7 +168,7 @@ func TestRequireInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -440,7 +440,7 @@ func TestRequireNotInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -738,7 +738,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -758,7 +758,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}}, @@ -847,7 +847,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -867,7 +867,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job2Args{Int: 123}}, {Args: Job1Args{String: "foo"}}, }) @@ -888,7 +888,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}}, From 62140381ee1888bcb33acb7730d3e91b6a595c2b Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Tue, 10 Sep 2024 15:17:19 -0500 Subject: [PATCH 3/4] new InsertMany/InsertManyTx with return values This adds new implementations of `InsertMany` / `InsertManyTx` that use the multirow `VALUES` syntax to allow the new rows to be returned upon insert. The alternative `COPY FROM ` implementation has been renamed to `InsertManyFast` / `InsertManyFastTx`. The expectation is that these forms will only be needed in cases where an extremely large number of records is being inserted simultaneously, whereas the new form is more user-friendly for the vast majority of other cases. --- CHANGELOG.md | 6 +- client.go | 131 ++++++++++ client_test.go | 450 +++++++++++++++++++++++++++++++++++ example_batch_insert_test.go | 4 +- rivertest/rivertest_test.go | 14 +- 5 files changed, 595 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d50d0a88..45609fdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -⚠️ Version 0.12.0 has a small breaking change in `rivermigrate`. As before, we try never to make breaking changes, but this one was deemed worth it because it's quite small and may help avoid panics. +⚠️ Version 0.12.0 has two small breaking changes, one for `InsertMany` and one in `rivermigrate`. As before, we try never to make breaking changes, but these ones were deemed worth it because of minimal impact and to help avoid panics. + +- **Breaking change:** `Client.InsertMany` / `InsertManyTx` now return the inserted rows rather than merely returning a count of the inserted rows. The new implementations no longer use Postgres' `COPY FROM` protocol in order to facilitate return values. + + Users who relied on the return count can merely wrap the returned rows in a `len()` to return to that behavior, or you can continue using the old APIs using their new names `InsertManyFast` and `InsertManyFastTx`. [PR #589](https://github.com/riverqueue/river/pull/589). - **Breaking change:** `rivermigrate.New` now returns a possible error along with a migrator. An error may be returned, for example, when a migration line is configured that doesn't exist. [PR #558](https://github.com/riverqueue/river/pull/558). diff --git a/client.go b/client.go index a73d35fd..47fe1113 100644 --- a/client.go +++ b/client.go @@ -1326,6 +1326,137 @@ type InsertManyParams struct { InsertOpts *InsertOpts } +// InsertMany inserts many jobs at once. Each job is inserted as an +// InsertManyParams tuple, which takes job args along with an optional set of +// insert options, which override insert options provided by an +// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. +// The provided context is used for the underlying Postgres inserts and can be +// used to cancel the operation or apply a timeout. +// +// count, err := client.InsertMany(ctx, []river.InsertManyParams{ +// {Args: BatchInsertArgs{}}, +// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, +// }) +// if err != nil { +// // handle error +// } +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + if !c.driver.HasPool() { + return nil, errNoDriverDBPool + } + + tx, err := c.driver.GetExecutor().Begin(ctx) + if err != nil { + return nil, err + } + defer tx.Rollback(ctx) + + inserted, err := c.insertMany(ctx, tx, params) + if err != nil { + return nil, err + } + + if err := tx.Commit(ctx); err != nil { + return nil, err + } + return inserted, nil +} + +// InsertManyTx inserts many jobs at once. Each job is inserted as an +// InsertManyParams tuple, which takes job args along with an optional set of +// insert options, which override insert options provided by an +// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults. +// The provided context is used for the underlying Postgres inserts and can be +// used to cancel the operation or apply a timeout. +// +// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{ +// {Args: BatchInsertArgs{}}, +// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}}, +// }) +// if err != nil { +// // handle error +// } +// +// Job uniqueness is not respected when using InsertMany due to unique inserts +// using an internal transaction and advisory lock that might lead to +// significant lock contention. Insert unique jobs using Insert instead. +// +// This variant lets a caller insert jobs atomically alongside other database +// changes. An inserted job isn't visible to be worked until the transaction +// commits, and if the transaction rolls back, so too is the inserted job. +func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + exec := c.driver.UnwrapExecutor(tx) + return c.insertMany(ctx, exec, params) +} + +func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) { + insertParams, err := c.insertManyParams(params) + if err != nil { + return nil, err + } + + jobRows, err := tx.JobInsertManyReturning(ctx, insertParams) + if err != nil { + return nil, err + } + + queues := make([]string, 0, 10) + for _, params := range insertParams { + if params.State == rivertype.JobStateAvailable { + queues = append(queues, params.Queue) + } + } + if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil { + return nil, err + } + + return sliceutil.Map(jobRows, + func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult { + return &rivertype.JobInsertResult{Job: jobRow} + }, + ), nil +} + +// Validates input parameters for a batch insert operation and generates a set +// of batch insert parameters. +func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) { + if len(params) < 1 { + return nil, errors.New("no jobs to insert") + } + + insertParams := make([]*riverdriver.JobInsertFastParams, len(params)) + for i, param := range params { + if err := c.validateJobArgs(param.Args); err != nil { + return nil, err + } + + if param.InsertOpts != nil { + // UniqueOpts aren't supported for batch inserts because they use PG + // advisory locks to work, and taking many locks simultaneously could + // easily lead to contention and deadlocks. + if !param.InsertOpts.UniqueOpts.isEmpty() { + return nil, errors.New("UniqueOpts are not supported for batch inserts") + } + } + + var err error + insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts) + if err != nil { + return nil, err + } + } + + return insertParams, nil +} + // InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism, // making the operation quite fast and memory efficient. Each job is inserted as // an InsertManyParams tuple, which takes job args along with an optional set of diff --git a/client_test.go b/client_test.go index 993da3b0..e38cbd24 100644 --- a/client_test.go +++ b/client_test.go @@ -1938,6 +1938,456 @@ func Test_Client_InsertManyFastTx(t *testing.T) { }) } +func Test_Client_InsertMany(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + return client, &testBundle{dbPool: dbPool} + } + + t.Run("SucceedsWithMultipleJobs", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + now := time.Now().UTC() + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{Name: "Foo"}, InsertOpts: &InsertOpts{Metadata: []byte(`{"a": "b"}`), Queue: "foo", Priority: 2}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: now.Add(time.Minute)}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + require.False(t, results[0].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[0].Job.Attempt) + require.Nil(t, results[0].Job.AttemptedAt) + require.WithinDuration(t, now, results[0].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[0].Job.AttemptedBy) + require.Positive(t, results[0].Job.ID) + require.JSONEq(t, `{"name": "Foo"}`, string(results[0].Job.EncodedArgs)) + require.Empty(t, results[0].Job.Errors) + require.Nil(t, results[0].Job.FinalizedAt) + require.Equal(t, "noOp", results[0].Job.Kind) + require.Equal(t, 25, results[0].Job.MaxAttempts) + require.JSONEq(t, `{"a": "b"}`, string(results[0].Job.Metadata)) + require.Equal(t, 2, results[0].Job.Priority) + require.Equal(t, "foo", results[0].Job.Queue) + require.WithinDuration(t, now, results[0].Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.Empty(t, results[0].Job.Tags) + require.Empty(t, results[0].Job.UniqueKey) + + require.False(t, results[1].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[1].Job.Attempt) + require.Nil(t, results[1].Job.AttemptedAt) + require.WithinDuration(t, now, results[1].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[1].Job.AttemptedBy) + require.Positive(t, results[1].Job.ID) + require.JSONEq(t, `{"name": ""}`, string(results[1].Job.EncodedArgs)) + require.Empty(t, results[1].Job.Errors) + require.Nil(t, results[1].Job.FinalizedAt) + require.Equal(t, "noOp", results[1].Job.Kind) + require.Equal(t, 25, results[1].Job.MaxAttempts) + require.JSONEq(t, `{}`, string(results[1].Job.Metadata)) + require.Equal(t, 1, results[1].Job.Priority) + require.Equal(t, "default", results[1].Job.Queue) + require.WithinDuration(t, now.Add(time.Minute), results[1].Job.ScheduledAt, time.Millisecond) + require.Equal(t, rivertype.JobStateScheduled, results[1].Job.State) + require.Empty(t, results[1].Job.Tags) + require.Empty(t, results[1].Job.UniqueKey) + + require.NotEqual(t, results[0].Job.ID, results[1].Job.ID) + + jobs, err := client.driver.GetExecutor().JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind()) + }) + + t.Run("TriggersImmediateWork", func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + _, bundle := setup(t) + + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + t.Cleanup(cancel) + + doneCh := make(chan struct{}) + close(doneCh) // don't need to block any jobs from completing + startedCh := make(chan int64) + + config := newTestConfig(t, makeAwaitCallback(startedCh, doneCh)) + config.FetchCooldown = 20 * time.Millisecond + config.FetchPollInterval = 20 * time.Second // essentially disable polling + config.Queues = map[string]QueueConfig{QueueDefault: {MaxWorkers: 2}, "another_queue": {MaxWorkers: 1}} + + client := newTestClient(t, bundle.dbPool, config) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: callbackArgs{}}, + {Args: callbackArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + // Wait for the client to be ready by waiting for a job to be executed: + riversharedtest.WaitOrTimeoutN(t, startedCh, 2) + + // Now that we've run one job, we shouldn't take longer than the cooldown to + // fetch another after insertion. LISTEN/NOTIFY should ensure we find out + // about the inserted job much faster than the poll interval. + // + // Note: we specifically use a different queue to ensure that the notify + // limiter is immediately to fire on this queue. + results, err = client.InsertMany(ctx, []InsertManyParams{ + {Args: callbackArgs{}, InsertOpts: &InsertOpts{Queue: "another_queue"}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + + select { + case <-startedCh: + // As long as this is meaningfully shorter than the poll interval, we can be + // sure the re-fetch came from listen/notify. + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for another_queue job to start") + } + + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("DoesNotTriggerInsertNotificationForNonAvailableJob", func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + _, bundle := setup(t) + + config := newTestConfig(t, nil) + config.FetchCooldown = 5 * time.Second + config.FetchPollInterval = 5 * time.Second + client := newTestClient(t, bundle.dbPool, config) + + startClient(ctx, t, client) + riversharedtest.WaitOrTimeout(t, client.baseStartStop.Started()) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "a", ScheduledAt: time.Now().Add(1 * time.Hour)}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{Queue: "b"}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + // Queue `a` should be "due" to be triggered because it wasn't triggered above. + require.True(t, client.insertNotifyLimiter.ShouldTrigger("a")) + // Queue `b` should *not* be "due" to be triggered because it was triggered above. + require.False(t, client.insertNotifyLimiter.ShouldTrigger("b")) + + require.NoError(t, client.Stop(ctx)) + }) + + t.Run("WithInsertOptsScheduledAtZeroTime", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: &noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: time.Time{}}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + + jobs, err := client.driver.GetExecutor().JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(noOpArgs{}).Kind()) + jobRow := jobs[0] + require.WithinDuration(t, time.Now(), jobRow.ScheduledAt, 2*time.Second) + }) + + t.Run("ErrorsOnInvalidQueueName", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: &noOpArgs{}, InsertOpts: &InsertOpts{Queue: "invalid*queue"}}, + }) + require.ErrorContains(t, err, "queue name is invalid") + require.Nil(t, results) + }) + + t.Run("ErrorsOnDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, _ = setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.ErrorIs(t, err, errNoDriverDBPool) + require.Nil(t, results) + }) + + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{}) + require.EqualError(t, err, "no jobs to insert") + require.Nil(t, results) + }) + + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + var unknownJobKindErr *UnknownJobKindError + require.ErrorAs(t, err, &unknownJobKindErr) + require.Equal(t, (&unregisteredJobArgs{}).Kind(), unknownJobKindErr.Kind) + require.Nil(t, results) + }) + + t.Run("AllowsUnknownJobKindWithoutWorkers", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + client.config.Workers = nil + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + results, err := client.InsertMany(ctx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, + }) + require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") + require.Empty(t, results) + }) +} + +func Test_Client_InsertManyTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + tx pgx.Tx + } + + setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + dbPool := riverinternaltest.TestDB(ctx, t) + config := newTestConfig(t, nil) + client := newTestClient(t, dbPool, config) + + tx, err := dbPool.Begin(ctx) + require.NoError(t, err) + t.Cleanup(func() { tx.Rollback(ctx) }) + + return client, &testBundle{ + tx: tx, + } + } + + t.Run("SucceedsWithMultipleJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + now := time.Now().UTC() + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{Name: "Foo"}, InsertOpts: &InsertOpts{Metadata: []byte(`{"a": "b"}`), Queue: "foo", Priority: 2}}, + {Args: noOpArgs{}, InsertOpts: &InsertOpts{ScheduledAt: now.Add(time.Minute)}}, + }) + require.NoError(t, err) + require.Len(t, results, 2) + + require.False(t, results[0].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[0].Job.Attempt) + require.Nil(t, results[0].Job.AttemptedAt) + require.WithinDuration(t, now, results[0].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[0].Job.AttemptedBy) + require.Positive(t, results[0].Job.ID) + require.JSONEq(t, `{"name": "Foo"}`, string(results[0].Job.EncodedArgs)) + require.Empty(t, results[0].Job.Errors) + require.Nil(t, results[0].Job.FinalizedAt) + require.Equal(t, "noOp", results[0].Job.Kind) + require.Equal(t, 25, results[0].Job.MaxAttempts) + require.JSONEq(t, `{"a": "b"}`, string(results[0].Job.Metadata)) + require.Equal(t, 2, results[0].Job.Priority) + require.Equal(t, "foo", results[0].Job.Queue) + require.WithinDuration(t, now, results[0].Job.ScheduledAt, 2*time.Second) + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.Empty(t, results[0].Job.Tags) + require.Empty(t, results[0].Job.UniqueKey) + + require.False(t, results[1].UniqueSkippedAsDuplicate) + require.Equal(t, 0, results[1].Job.Attempt) + require.Nil(t, results[1].Job.AttemptedAt) + require.WithinDuration(t, now, results[1].Job.CreatedAt, 2*time.Second) + require.Empty(t, results[1].Job.AttemptedBy) + require.Positive(t, results[1].Job.ID) + require.JSONEq(t, `{"name": ""}`, string(results[1].Job.EncodedArgs)) + require.Empty(t, results[1].Job.Errors) + require.Nil(t, results[1].Job.FinalizedAt) + require.Equal(t, "noOp", results[1].Job.Kind) + require.Equal(t, 25, results[1].Job.MaxAttempts) + require.JSONEq(t, `{}`, string(results[1].Job.Metadata)) + require.Equal(t, 1, results[1].Job.Priority) + require.Equal(t, "default", results[1].Job.Queue) + require.WithinDuration(t, now.Add(time.Minute), results[1].Job.ScheduledAt, time.Millisecond) + require.Equal(t, rivertype.JobStateScheduled, results[1].Job.State) + require.Empty(t, results[1].Job.Tags) + require.Empty(t, results[1].Job.UniqueKey) + + require.NotEqual(t, results[0].Job.ID, results[1].Job.ID) + + jobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, jobs, 2, "Expected to find exactly two jobs of kind: "+(noOpArgs{}).Kind()) + }) + + t.Run("SetsScheduledAtToNowByDefault", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, nil}}) + require.NoError(t, err) + require.Len(t, results, 1) + + require.Equal(t, rivertype.JobStateAvailable, results[0].Job.State) + require.WithinDuration(t, time.Now(), results[0].Job.ScheduledAt, 2*time.Second) + + insertedJobs, err := client.driver.UnwrapExecutor(bundle.tx).JobGetByKindMany(ctx, []string{(noOpArgs{}).Kind()}) + require.NoError(t, err) + require.Len(t, insertedJobs, 1) + require.Equal(t, rivertype.JobStateAvailable, insertedJobs[0].State) + require.WithinDuration(t, time.Now(), insertedJobs[0].ScheduledAt, 2*time.Second) + }) + + t.Run("SupportsScheduledJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + startClient(ctx, t, client) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{{noOpArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(time.Minute)}}}) + require.NoError(t, err) + require.Len(t, results, 1) + + require.Equal(t, rivertype.JobStateScheduled, results[0].Job.State) + require.WithinDuration(t, time.Now().Add(time.Minute), results[0].Job.ScheduledAt, 2*time.Second) + }) + + // A client's allowed to send nil to their driver so they can, for example, + // easily use test transactions in their test suite. + t.Run("WithDriverWithoutPool", func(t *testing.T) { + t.Parallel() + + _, bundle := setup(t) + + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsWithZeroJobs", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{}) + require.EqualError(t, err, "no jobs to insert") + require.Nil(t, results) + }) + + t.Run("ErrorsOnUnknownJobKindWithWorkers", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + var unknownJobKindErr *UnknownJobKindError + require.ErrorAs(t, err, &unknownJobKindErr) + require.Equal(t, (&unregisteredJobArgs{}).Kind(), unknownJobKindErr.Kind) + require.Nil(t, results) + }) + + t.Run("AllowsUnknownJobKindWithoutWorkers", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + client.config.Workers = nil + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: unregisteredJobArgs{}}, + }) + require.NoError(t, err) + require.Len(t, results, 1) + }) + + t.Run("ErrorsOnInsertOptsUniqueOpts", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t) + + results, err := client.InsertManyTx(ctx, bundle.tx, []InsertManyParams{ + {Args: noOpArgs{}, InsertOpts: &InsertOpts{UniqueOpts: UniqueOpts{ByArgs: true}}}, + }) + require.EqualError(t, err, "UniqueOpts are not supported for batch inserts") + require.Empty(t, results) + }) +} + func Test_Client_JobGet(t *testing.T) { t.Parallel() diff --git a/example_batch_insert_test.go b/example_batch_insert_test.go index a3b8274d..392d7786 100644 --- a/example_batch_insert_test.go +++ b/example_batch_insert_test.go @@ -67,7 +67,7 @@ func Example_batchInsert() { panic(err) } - count, err := riverClient.InsertManyFast(ctx, []river.InsertManyParams{ + results, err := riverClient.InsertMany(ctx, []river.InsertManyParams{ {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, {Args: BatchInsertArgs{}}, @@ -77,7 +77,7 @@ func Example_batchInsert() { if err != nil { panic(err) } - fmt.Printf("Inserted %d jobs\n", count) + fmt.Printf("Inserted %d jobs\n", len(results)) waitForNJobs(subscribeChan, 5) diff --git a/rivertest/rivertest_test.go b/rivertest/rivertest_test.go index 7201ed0a..c51a59db 100644 --- a/rivertest/rivertest_test.go +++ b/rivertest/rivertest_test.go @@ -168,7 +168,7 @@ func TestRequireInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -440,7 +440,7 @@ func TestRequireNotInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -738,7 +738,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -758,7 +758,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}}, @@ -847,7 +847,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, }) @@ -867,7 +867,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job2Args{Int: 123}}, {Args: Job1Args{String: "foo"}}, }) @@ -888,7 +888,7 @@ func TestRequireManyInsertedTx(t *testing.T) { riverClient, bundle := setup(t) - _, err := riverClient.InsertManyFastTx(ctx, bundle.tx, []river.InsertManyParams{ + _, err := riverClient.InsertManyTx(ctx, bundle.tx, []river.InsertManyParams{ {Args: Job1Args{String: "foo"}}, {Args: Job1Args{String: "bar"}}, {Args: Job2Args{Int: 123}}, From c78055bc926c120f8a56844cd1c5b7869cd64d5f Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 12 Sep 2024 21:55:31 -0500 Subject: [PATCH 4/4] renames --- client.go | 4 +- .../riverdrivertest/riverdrivertest.go | 158 +++++++------- riverdriver/river_driver_interface.go | 4 +- .../internal/dbsqlc/river_job.sql.go | 192 +++++++++--------- .../river_database_sql_driver.go | 80 ++++---- .../riverpgxv5/internal/dbsqlc/river_job.sql | 58 +++--- .../internal/dbsqlc/river_job.sql.go | 186 ++++++++--------- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 98 ++++----- 8 files changed, 391 insertions(+), 389 deletions(-) diff --git a/client.go b/client.go index 47fe1113..89509c07 100644 --- a/client.go +++ b/client.go @@ -1403,7 +1403,7 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, return nil, err } - jobRows, err := tx.JobInsertManyReturning(ctx, insertParams) + jobRows, err := tx.JobInsertFastMany(ctx, insertParams) if err != nil { return nil, err } @@ -1537,7 +1537,7 @@ func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []Ins } func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*riverdriver.JobInsertFastParams) (int, error) { - inserted, err := tx.JobInsertFastMany(ctx, insertParams) + inserted, err := tx.JobInsertFastManyNoReturning(ctx, insertParams) if err != nil { return inserted, err } diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 06f30c52..e8e3e76c 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -919,6 +919,83 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobInsertFastMany", func(t *testing.T) { t.Parallel() + t.Run("AllArgs", func(t *testing.T) { + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: ptrutil.Ptr(now.Add(time.Duration(i) * time.Minute)), + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + jobRows, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + require.Len(t, jobRows, len(insertParams)) + + for i, job := range jobRows { + require.Equal(t, 0, job.Attempt) + require.Nil(t, job.AttemptedAt) + require.Empty(t, job.AttemptedBy) + require.WithinDuration(t, now, job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) + require.Empty(t, job.Errors) + require.Nil(t, job.FinalizedAt) + require.Equal(t, "test_kind", job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, job.Priority) + require.Equal(t, rivercommon.QueueDefault, job.Queue) + requireEqualTime(t, now.Add(time.Duration(i)*time.Minute), job.ScheduledAt) + require.Equal(t, rivertype.JobStateAvailable, job.State) + require.Equal(t, []string{"tag"}, job.Tags) + } + }) + + t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) { + exec, _ := setup(ctx, t) + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, // explicit nil + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + results, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + require.Len(t, results, len(insertParams)) + + jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) + require.NoError(t, err) + require.Len(t, jobsAfter, len(insertParams)) + for _, job := range jobsAfter { + require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) + } + }) + }) + + t.Run("JobInsertFastManyNoReturning", func(t *testing.T) { + t.Parallel() + t.Run("AllArgs", func(t *testing.T) { exec, _ := setup(ctx, t) @@ -944,7 +1021,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, insertParams[i].ScheduledAt = &now } - count, err := exec.JobInsertFastMany(ctx, insertParams) + count, err := exec.JobInsertFastManyNoReturning(ctx, insertParams) require.NoError(t, err) require.Len(t, insertParams, count) @@ -987,7 +1064,7 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, } } - count, err := exec.JobInsertFastMany(ctx, insertParams) + count, err := exec.JobInsertFastManyNoReturning(ctx, insertParams) require.NoError(t, err) require.Len(t, insertParams, count) @@ -1152,83 +1229,6 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) - t.Run("JobInsertManyReturning", func(t *testing.T) { - t.Parallel() - - t.Run("AllArgs", func(t *testing.T) { - exec, _ := setup(ctx, t) - - now := time.Now().UTC() - - insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { - insertParams[i] = &riverdriver.JobInsertFastParams{ - EncodedArgs: []byte(`{"encoded": "args"}`), - Kind: "test_kind", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Metadata: []byte(`{"meta": "data"}`), - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - ScheduledAt: ptrutil.Ptr(now.Add(time.Duration(i) * time.Minute)), - State: rivertype.JobStateAvailable, - Tags: []string{"tag"}, - } - } - - jobRows, err := exec.JobInsertManyReturning(ctx, insertParams) - require.NoError(t, err) - require.Len(t, jobRows, len(insertParams)) - - for i, job := range jobRows { - require.Equal(t, 0, job.Attempt) - require.Nil(t, job.AttemptedAt) - require.Empty(t, job.AttemptedBy) - require.WithinDuration(t, now, job.CreatedAt, 2*time.Second) - require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) - require.Empty(t, job.Errors) - require.Nil(t, job.FinalizedAt) - require.Equal(t, "test_kind", job.Kind) - require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) - require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) - require.Equal(t, rivercommon.PriorityDefault, job.Priority) - require.Equal(t, rivercommon.QueueDefault, job.Queue) - requireEqualTime(t, now.Add(time.Duration(i)*time.Minute), job.ScheduledAt) - require.Equal(t, rivertype.JobStateAvailable, job.State) - require.Equal(t, []string{"tag"}, job.Tags) - } - }) - - t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) { - exec, _ := setup(ctx, t) - - insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { - insertParams[i] = &riverdriver.JobInsertFastParams{ - EncodedArgs: []byte(`{"encoded": "args"}`), - Kind: "test_kind", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Metadata: []byte(`{"meta": "data"}`), - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - ScheduledAt: nil, // explicit nil - State: rivertype.JobStateAvailable, - Tags: []string{"tag"}, - } - } - - results, err := exec.JobInsertManyReturning(ctx, insertParams) - require.NoError(t, err) - require.Len(t, results, len(insertParams)) - - jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) - require.NoError(t, err) - require.Len(t, jobsAfter, len(insertParams)) - for _, job := range jobsAfter { - require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) - } - }) - }) - t.Run("JobInsertUnique", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 8609b450..e94c64a1 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -116,9 +116,9 @@ type Executor interface { JobGetByKindMany(ctx context.Context, kind []string) ([]*rivertype.JobRow, error) JobGetStuck(ctx context.Context, params *JobGetStuckParams) ([]*rivertype.JobRow, error) JobInsertFast(ctx context.Context, params *JobInsertFastParams) (*rivertype.JobRow, error) - JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) (int, error) + JobInsertFastMany(ctx context.Context, params []*JobInsertFastParams) ([]*rivertype.JobRow, error) + JobInsertFastManyNoReturning(ctx context.Context, params []*JobInsertFastParams) (int, error) JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error) - JobInsertManyReturning(ctx context.Context, params []*JobInsertFastParams) ([]*rivertype.JobRow, error) JobInsertUnique(ctx context.Context, params *JobInsertUniqueParams) (*JobInsertUniqueResult, error) JobList(ctx context.Context, query string, namedArgs map[string]any) ([]*rivertype.JobRow, error) JobListFields() string diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 81de9241..dafdc2d9 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -592,7 +592,99 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast return &i, err } -const jobInsertFastMany = `-- name: JobInsertFastMany :execrows +const jobInsertFastMany = `-- name: JobInsertFastMany :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertFastManyParams struct { + Args []string + Kind []string + MaxAttempts []int16 + Metadata []string + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobInsertFastMany, + pq.Array(arg.Args), + pq.Array(arg.Kind), + pq.Array(arg.MaxAttempts), + pq.Array(arg.Metadata), + pq.Array(arg.Priority), + pq.Array(arg.Queue), + pq.Array(arg.ScheduledAt), + pq.Array(arg.State), + pq.Array(arg.Tags), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const jobInsertFastManyNoReturning = `-- name: JobInsertFastManyNoReturning :execrows INSERT INTO river_job( args, kind, @@ -620,7 +712,7 @@ INSERT INTO river_job( string_to_array(unnest($9::text[]), ',') ` -type JobInsertFastManyParams struct { +type JobInsertFastManyNoReturningParams struct { Args []string Kind []string MaxAttempts []int16 @@ -632,8 +724,8 @@ type JobInsertFastManyParams struct { Tags []string } -func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) (int64, error) { - result, err := db.ExecContext(ctx, jobInsertFastMany, +func (q *Queries) JobInsertFastManyNoReturning(ctx context.Context, db DBTX, arg *JobInsertFastManyNoReturningParams) (int64, error) { + result, err := db.ExecContext(ctx, jobInsertFastManyNoReturning, pq.Array(arg.Args), pq.Array(arg.Kind), pq.Array(arg.MaxAttempts), @@ -745,98 +837,6 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } -const jobInsertManyReturning = `-- name: JobInsertManyReturning :many -INSERT INTO river_job( - args, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags -) SELECT - unnest($1::jsonb[]), - unnest($2::text[]), - unnest($3::smallint[]), - unnest($4::jsonb[]), - unnest($5::smallint[]), - unnest($6::text[]), - unnest($7::timestamptz[]), - -- To avoid requiring pgx users to register the OID of the river_job_state[] - -- type, we cast the array to text[] and then to river_job_state. - unnest($8::text[])::river_job_state, - -- Unnest on a multi-dimensional array will fully flatten the array, so we - -- encode the tag list as a comma-separated string and split it in the - -- query. - string_to_array(unnest($9::text[]), ',') -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key -` - -type JobInsertManyReturningParams struct { - Args []string - Kind []string - MaxAttempts []int16 - Metadata []string - Priority []int16 - Queue []string - ScheduledAt []time.Time - State []string - Tags []string -} - -func (q *Queries) JobInsertManyReturning(ctx context.Context, db DBTX, arg *JobInsertManyReturningParams) ([]*RiverJob, error) { - rows, err := db.QueryContext(ctx, jobInsertManyReturning, - pq.Array(arg.Args), - pq.Array(arg.Kind), - pq.Array(arg.MaxAttempts), - pq.Array(arg.Metadata), - pq.Array(arg.Priority), - pq.Array(arg.Queue), - pq.Array(arg.ScheduledAt), - pq.Array(arg.State), - pq.Array(arg.Tags), - ) - if err != nil { - return nil, err - } - defer rows.Close() - var items []*RiverJob - for rows.Next() { - var i RiverJob - if err := rows.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - pq.Array(&i.AttemptedBy), - &i.CreatedAt, - pq.Array(&i.Errors), - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - pq.Array(&i.Tags), - &i.UniqueKey, - ); err != nil { - return nil, err - } - items = append(items, &i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const jobInsertUnique = `-- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 7f5d0fa1..b43076cd 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -216,7 +216,7 @@ func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { +func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { insertJobsParams := &dbsqlc.JobInsertFastManyParams{ Args: make([]string, len(params)), Kind: make([]string, len(params)), @@ -225,7 +225,7 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. Priority: make([]int16, len(params)), Queue: make([]string, len(params)), ScheduledAt: make([]time.Time, len(params)), - State: make([]dbsqlc.RiverJobState, len(params)), + State: make([]string, len(params)), Tags: make([]string, len(params)), } now := time.Now() @@ -243,51 +243,29 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. tags = []string{} } - insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), "{}") + defaultObject := "{}" + + insertJobsParams.Args[i] = valutil.ValOrDefault(string(params.EncodedArgs), defaultObject) insertJobsParams.Kind[i] = params.Kind insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec - insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), "{}") + insertJobsParams.Metadata[i] = valutil.ValOrDefault(string(params.Metadata), defaultObject) insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec insertJobsParams.Queue[i] = params.Queue insertJobsParams.ScheduledAt[i] = scheduledAt - insertJobsParams.State[i] = dbsqlc.RiverJobState(params.State) + insertJobsParams.State[i] = string(params.State) insertJobsParams.Tags[i] = strings.Join(tags, ",") } - numInserted, err := dbsqlc.New().JobInsertFastMany(ctx, e.dbtx, insertJobsParams) - if err != nil { - return 0, interpretError(err) - } - - return int(numInserted), nil -} - -func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobInsertFullParams) (*rivertype.JobRow, error) { - job, err := dbsqlc.New().JobInsertFull(ctx, e.dbtx, &dbsqlc.JobInsertFullParams{ - Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec - AttemptedAt: params.AttemptedAt, - Args: string(params.EncodedArgs), - CreatedAt: params.CreatedAt, - Errors: sliceutil.Map(params.Errors, func(e []byte) string { return string(e) }), - FinalizedAt: params.FinalizedAt, - Kind: params.Kind, - MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), //nolint:gosec - Metadata: valutil.ValOrDefault(string(params.Metadata), "{}"), - Priority: int16(min(params.Priority, math.MaxInt16)), //nolint:gosec - Queue: params.Queue, - ScheduledAt: params.ScheduledAt, - State: dbsqlc.RiverJobState(params.State), - Tags: params.Tags, - UniqueKey: params.UniqueKey, - }) + items, err := dbsqlc.New().JobInsertFastMany(ctx, e.dbtx, insertJobsParams) if err != nil { return nil, interpretError(err) } - return jobRowFromInternal(job) + + return mapSliceError(items, jobRowFromInternal) } -func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { - insertJobsParams := &dbsqlc.JobInsertManyReturningParams{ +func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { + insertJobsParams := &dbsqlc.JobInsertFastManyNoReturningParams{ Args: make([]string, len(params)), Kind: make([]string, len(params)), MaxAttempts: make([]int16, len(params)), @@ -295,7 +273,7 @@ func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdr Priority: make([]int16, len(params)), Queue: make([]string, len(params)), ScheduledAt: make([]time.Time, len(params)), - State: make([]string, len(params)), + State: make([]dbsqlc.RiverJobState, len(params)), Tags: make([]string, len(params)), } now := time.Now() @@ -322,16 +300,40 @@ func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdr insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec insertJobsParams.Queue[i] = params.Queue insertJobsParams.ScheduledAt[i] = scheduledAt - insertJobsParams.State[i] = string(params.State) + insertJobsParams.State[i] = dbsqlc.RiverJobState(params.State) insertJobsParams.Tags[i] = strings.Join(tags, ",") } - items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams) + numInserted, err := dbsqlc.New().JobInsertFastManyNoReturning(ctx, e.dbtx, insertJobsParams) if err != nil { - return nil, interpretError(err) + return 0, interpretError(err) } - return mapSliceError(items, jobRowFromInternal) + return int(numInserted), nil +} + +func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobInsertFullParams) (*rivertype.JobRow, error) { + job, err := dbsqlc.New().JobInsertFull(ctx, e.dbtx, &dbsqlc.JobInsertFullParams{ + Attempt: int16(min(params.Attempt, math.MaxInt16)), //nolint:gosec + AttemptedAt: params.AttemptedAt, + Args: string(params.EncodedArgs), + CreatedAt: params.CreatedAt, + Errors: sliceutil.Map(params.Errors, func(e []byte) string { return string(e) }), + FinalizedAt: params.FinalizedAt, + Kind: params.Kind, + MaxAttempts: int16(min(params.MaxAttempts, math.MaxInt16)), //nolint:gosec + Metadata: valutil.ValOrDefault(string(params.Metadata), "{}"), + Priority: int16(min(params.Priority, math.MaxInt16)), //nolint:gosec + Queue: params.Queue, + ScheduledAt: params.ScheduledAt, + State: dbsqlc.RiverJobState(params.State), + Tags: params.Tags, + UniqueKey: params.UniqueKey, + }) + if err != nil { + return nil, interpretError(err) + } + return jobRowFromInternal(job) } func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index df20035d..e62ee160 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -223,7 +223,35 @@ INSERT INTO river_job( coalesce(@tags::varchar(255)[], '{}') ) RETURNING *; --- name: JobInsertFastMany :execrows +-- name: JobInsertFastMany :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest(@args::jsonb[]), + unnest(@kind::text[]), + unnest(@max_attempts::smallint[]), + unnest(@metadata::jsonb[]), + unnest(@priority::smallint[]), + unnest(@queue::text[]), + unnest(@scheduled_at::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest(@state::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest(@tags::text[]), ',') +RETURNING *; + +-- name: JobInsertFastManyNoReturning :execrows INSERT INTO river_job( args, kind, @@ -285,34 +313,6 @@ INSERT INTO river_job( @unique_key ) RETURNING *; --- name: JobInsertManyReturning :many -INSERT INTO river_job( - args, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags -) SELECT - unnest(@args::jsonb[]), - unnest(@kind::text[]), - unnest(@max_attempts::smallint[]), - unnest(@metadata::jsonb[]), - unnest(@priority::smallint[]), - unnest(@queue::text[]), - unnest(@scheduled_at::timestamptz[]), - -- To avoid requiring pgx users to register the OID of the river_job_state[] - -- type, we cast the array to text[] and then to river_job_state. - unnest(@state::text[])::river_job_state, - -- Unnest on a multi-dimensional array will fully flatten the array, so we - -- encode the tag list as a comma-separated string and split it in the - -- query. - string_to_array(unnest(@tags::text[]), ',') -RETURNING *; - -- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 55138fe0..a2713bef 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -578,7 +578,96 @@ func (q *Queries) JobInsertFast(ctx context.Context, db DBTX, arg *JobInsertFast return &i, err } -const jobInsertFastMany = `-- name: JobInsertFastMany :execrows +const jobInsertFastMany = `-- name: JobInsertFastMany :many +INSERT INTO river_job( + args, + kind, + max_attempts, + metadata, + priority, + queue, + scheduled_at, + state, + tags +) SELECT + unnest($1::jsonb[]), + unnest($2::text[]), + unnest($3::smallint[]), + unnest($4::jsonb[]), + unnest($5::smallint[]), + unnest($6::text[]), + unnest($7::timestamptz[]), + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($8::text[])::river_job_state, + -- Unnest on a multi-dimensional array will fully flatten the array, so we + -- encode the tag list as a comma-separated string and split it in the + -- query. + string_to_array(unnest($9::text[]), ',') +RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key +` + +type JobInsertFastManyParams struct { + Args [][]byte + Kind []string + MaxAttempts []int16 + Metadata [][]byte + Priority []int16 + Queue []string + ScheduledAt []time.Time + State []string + Tags []string +} + +func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobInsertFastMany, + arg.Args, + arg.Kind, + arg.MaxAttempts, + arg.Metadata, + arg.Priority, + arg.Queue, + arg.ScheduledAt, + arg.State, + arg.Tags, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const jobInsertFastManyNoReturning = `-- name: JobInsertFastManyNoReturning :execrows INSERT INTO river_job( args, kind, @@ -606,7 +695,7 @@ INSERT INTO river_job( string_to_array(unnest($9::text[]), ',') ` -type JobInsertFastManyParams struct { +type JobInsertFastManyNoReturningParams struct { Args [][]byte Kind []string MaxAttempts []int16 @@ -618,8 +707,8 @@ type JobInsertFastManyParams struct { Tags []string } -func (q *Queries) JobInsertFastMany(ctx context.Context, db DBTX, arg *JobInsertFastManyParams) (int64, error) { - result, err := db.Exec(ctx, jobInsertFastMany, +func (q *Queries) JobInsertFastManyNoReturning(ctx context.Context, db DBTX, arg *JobInsertFastManyNoReturningParams) (int64, error) { + result, err := db.Exec(ctx, jobInsertFastManyNoReturning, arg.Args, arg.Kind, arg.MaxAttempts, @@ -731,95 +820,6 @@ func (q *Queries) JobInsertFull(ctx context.Context, db DBTX, arg *JobInsertFull return &i, err } -const jobInsertManyReturning = `-- name: JobInsertManyReturning :many -INSERT INTO river_job( - args, - kind, - max_attempts, - metadata, - priority, - queue, - scheduled_at, - state, - tags -) SELECT - unnest($1::jsonb[]), - unnest($2::text[]), - unnest($3::smallint[]), - unnest($4::jsonb[]), - unnest($5::smallint[]), - unnest($6::text[]), - unnest($7::timestamptz[]), - -- To avoid requiring pgx users to register the OID of the river_job_state[] - -- type, we cast the array to text[] and then to river_job_state. - unnest($8::text[])::river_job_state, - -- Unnest on a multi-dimensional array will fully flatten the array, so we - -- encode the tag list as a comma-separated string and split it in the - -- query. - string_to_array(unnest($9::text[]), ',') -RETURNING id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key -` - -type JobInsertManyReturningParams struct { - Args [][]byte - Kind []string - MaxAttempts []int16 - Metadata [][]byte - Priority []int16 - Queue []string - ScheduledAt []time.Time - State []string - Tags []string -} - -func (q *Queries) JobInsertManyReturning(ctx context.Context, db DBTX, arg *JobInsertManyReturningParams) ([]*RiverJob, error) { - rows, err := db.Query(ctx, jobInsertManyReturning, - arg.Args, - arg.Kind, - arg.MaxAttempts, - arg.Metadata, - arg.Priority, - arg.Queue, - arg.ScheduledAt, - arg.State, - arg.Tags, - ) - if err != nil { - return nil, err - } - defer rows.Close() - var items []*RiverJob - for rows.Next() { - var i RiverJob - if err := rows.Scan( - &i.ID, - &i.Args, - &i.Attempt, - &i.AttemptedAt, - &i.AttemptedBy, - &i.CreatedAt, - &i.Errors, - &i.FinalizedAt, - &i.Kind, - &i.MaxAttempts, - &i.Metadata, - &i.Priority, - &i.Queue, - &i.State, - &i.ScheduledAt, - &i.Tags, - &i.UniqueKey, - ); err != nil { - return nil, err - } - items = append(items, &i) - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const jobInsertUnique = `-- name: JobInsertUnique :one INSERT INTO river_job( args, diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index b60e9671..659f60d1 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -211,7 +211,55 @@ func (e *Executor) JobInsertFast(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { +func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { + insertJobsParams := &dbsqlc.JobInsertFastManyParams{ + Args: make([][]byte, len(params)), + Kind: make([]string, len(params)), + MaxAttempts: make([]int16, len(params)), + Metadata: make([][]byte, len(params)), + Priority: make([]int16, len(params)), + Queue: make([]string, len(params)), + ScheduledAt: make([]time.Time, len(params)), + State: make([]string, len(params)), + Tags: make([]string, len(params)), + } + now := time.Now() + + for i := 0; i < len(params); i++ { + params := params[i] + + scheduledAt := now + if params.ScheduledAt != nil { + scheduledAt = *params.ScheduledAt + } + + tags := params.Tags + if tags == nil { + tags = []string{} + } + + defaultObject := []byte("{}") + + insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) + insertJobsParams.Kind[i] = params.Kind + insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec + insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) + insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec + insertJobsParams.Queue[i] = params.Queue + insertJobsParams.ScheduledAt[i] = scheduledAt + insertJobsParams.State[i] = string(params.State) + insertJobsParams.Tags[i] = strings.Join(tags, ",") + } + + items, err := dbsqlc.New().JobInsertFastMany(ctx, e.dbtx, insertJobsParams) + if err != nil { + return nil, interpretError(err) + } + + return mapSliceError(items, jobRowFromInternal) +} + +func (e *Executor) JobInsertFastManyNoReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) (int, error) { insertJobsParams := make([]*dbsqlc.JobInsertFastManyCopyFromParams, len(params)) now := time.Now() @@ -278,54 +326,6 @@ func (e *Executor) JobInsertFull(ctx context.Context, params *riverdriver.JobIns return jobRowFromInternal(job) } -func (e *Executor) JobInsertManyReturning(ctx context.Context, params []*riverdriver.JobInsertFastParams) ([]*rivertype.JobRow, error) { - insertJobsParams := &dbsqlc.JobInsertManyReturningParams{ - Args: make([][]byte, len(params)), - Kind: make([]string, len(params)), - MaxAttempts: make([]int16, len(params)), - Metadata: make([][]byte, len(params)), - Priority: make([]int16, len(params)), - Queue: make([]string, len(params)), - ScheduledAt: make([]time.Time, len(params)), - State: make([]string, len(params)), - Tags: make([]string, len(params)), - } - now := time.Now() - - for i := 0; i < len(params); i++ { - params := params[i] - - scheduledAt := now - if params.ScheduledAt != nil { - scheduledAt = *params.ScheduledAt - } - - tags := params.Tags - if tags == nil { - tags = []string{} - } - - defaultObject := []byte("{}") - - insertJobsParams.Args[i] = sliceutil.DefaultIfEmpty(params.EncodedArgs, defaultObject) - insertJobsParams.Kind[i] = params.Kind - insertJobsParams.MaxAttempts[i] = int16(min(params.MaxAttempts, math.MaxInt16)) //nolint:gosec - insertJobsParams.Metadata[i] = sliceutil.DefaultIfEmpty(params.Metadata, defaultObject) - insertJobsParams.Priority[i] = int16(min(params.Priority, math.MaxInt16)) //nolint:gosec - insertJobsParams.Queue[i] = params.Queue - insertJobsParams.ScheduledAt[i] = scheduledAt - insertJobsParams.State[i] = string(params.State) - insertJobsParams.Tags[i] = strings.Join(tags, ",") - } - - items, err := dbsqlc.New().JobInsertManyReturning(ctx, e.dbtx, insertJobsParams) - if err != nil { - return nil, interpretError(err) - } - - return mapSliceError(items, jobRowFromInternal) -} - func (e *Executor) JobInsertUnique(ctx context.Context, params *riverdriver.JobInsertUniqueParams) (*riverdriver.JobInsertUniqueResult, error) { insertRes, err := dbsqlc.New().JobInsertUnique(ctx, e.dbtx, &dbsqlc.JobInsertUniqueParams{ Args: params.EncodedArgs,