Skip to content

Commit

Permalink
new InsertMany/InsertManyTx with return values
Browse files Browse the repository at this point in the history
This adds new implementations of `InsertMany` / `InsertManyTx` that use
the multirow `VALUES` syntax to allow the new rows to be returned upon
insert. The alternative `COPY FROM ` implementation has been renamed to
`InsertManyFast` / `InsertManyFastTx`. The expectation is that these
forms will only be needed in cases where an extremely large number of
records is being inserted simultaneously, whereas the new form is more
user-friendly for the vast majority of other cases.
  • Loading branch information
bgentry committed Sep 12, 2024
1 parent f93246e commit 73264a9
Show file tree
Hide file tree
Showing 5 changed files with 595 additions and 10 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

⚠️ Version 0.12.0 has a small breaking change in `rivermigrate`. As before, we try never to make breaking changes, but this one was deemed worth it because it's quite small and may help avoid panics.
⚠️ Version 0.12.0 has two small breaking changes, one for `InsertMany` and one in `rivermigrate`. As before, we try never to make breaking changes, but these ones were deemed worth it because of minimal impact and to help avoid panics.

- **Breaking change:** `Client.InsertMany` / `InsertManyTx` now return the inserted rows rather than merely returning a count of the inserted rows. The new implementations no longer use Postgres' `COPY FROM` protocol in order to facilitate return values.

Users who relied on the return count can merely wrap the returned rows in a `len()` to return to that behavior, or you can continue using the old APIs using their new names `InsertManyFast` and `InsertManyFastTx`. [PR #589](https://github.com/riverqueue/river/pull/589).

- **Breaking change:** `rivermigrate.New` now returns a possible error along with a migrator. An error may be returned, for example, when a migration line is configured that doesn't exist. [PR #558](https://github.com/riverqueue/river/pull/558).

Expand Down
131 changes: 131 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1326,6 +1326,137 @@ type InsertManyParams struct {
InsertOpts *InsertOpts
}

// InsertMany inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertMany(ctx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
if !c.driver.HasPool() {
return nil, errNoDriverDBPool
}

tx, err := c.driver.GetExecutor().Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)

inserted, err := c.insertMany(ctx, tx, params)
if err != nil {
return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}
return inserted, nil
}

// InsertManyTx inserts many jobs at once. Each job is inserted as an
// InsertManyParams tuple, which takes job args along with an optional set of
// insert options, which override insert options provided by an
// JobArgsWithInsertOpts.InsertOpts implementation or any client-level defaults.
// The provided context is used for the underlying Postgres inserts and can be
// used to cancel the operation or apply a timeout.
//
// count, err := client.InsertManyTx(ctx, tx, []river.InsertManyParams{
// {Args: BatchInsertArgs{}},
// {Args: BatchInsertArgs{}, InsertOpts: &river.InsertOpts{Priority: 3}},
// })
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
//
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
exec := c.driver.UnwrapExecutor(tx)
return c.insertMany(ctx, exec, params)
}

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

jobRows, err := tx.JobInsertManyReturning(ctx, insertParams)
if err != nil {
return nil, err
}

queues := make([]string, 0, 10)
for _, params := range insertParams {
if params.State == rivertype.JobStateAvailable {
queues = append(queues, params.Queue)
}
}
if err := c.maybeNotifyInsertForQueues(ctx, tx, queues); err != nil {
return nil, err
}

return sliceutil.Map(jobRows,
func(jobRow *rivertype.JobRow) *rivertype.JobInsertResult {
return &rivertype.JobInsertResult{Job: jobRow}
},
), nil
}

// Validates input parameters for an a batch insert operation and generates a
// set of batch insert parameters.
func (c *Client[TTx]) insertManyParams(params []InsertManyParams) ([]*riverdriver.JobInsertFastParams, error) {
if len(params) < 1 {
return nil, errors.New("no jobs to insert")
}

insertParams := make([]*riverdriver.JobInsertFastParams, len(params))
for i, param := range params {
if err := c.validateJobArgs(param.Args); err != nil {
return nil, err
}

if param.InsertOpts != nil {
// UniqueOpts aren't support for batch inserts because they use PG
// advisory locks to work, and taking many locks simultaneously
// could easily lead to contention and deadlocks.
if !param.InsertOpts.UniqueOpts.isEmpty() {
return nil, errors.New("UniqueOpts are not supported for batch inserts")
}
}

var err error
insertParams[i], _, err = insertParamsFromConfigArgsAndOptions(&c.baseService.Archetype, c.config, param.Args, param.InsertOpts)
if err != nil {
return nil, err
}
}

return insertParams, nil
}

// InsertManyFast inserts many jobs at once using Postgres' `COPY FROM` mechanism,
// making the operation quite fast and memory efficient. Each job is inserted as
// an InsertManyParams tuple, which takes job args along with an optional set of
Expand Down
Loading

0 comments on commit 73264a9

Please sign in to comment.