Skip to content

Commit

Permalink
add JobCancel API to cancel a running job
Browse files Browse the repository at this point in the history
This adds a new `Client` method `Cancel` which can cancel a running job.
It has these behaviors:

* Jobs which are still scheduled, retryable, or available are
  immediately cancelled and will not be run again.
* Jobs that are already finalized (completed, cancelled, or discarded)
  are not touched.
* Jobs that are actively running are marked for cancellation, and
  cancellation is attempted using a `LISTEN`/`NOTIFY` pubsub message
  which is picked up by the client and producer running that job.

Because Go offers no way to interrupt a running goroutine, actively
running jobs cannot be immediately halted and cancelled, so we can only
cancel the job's context. Once the cancellation signal is received by
the client running the job, any error returned by that job will result
in it being cancelled permanently and not retried. However if the job
returns no error, it will be completed as usual.

In the event the running job finishes executing _before_ the cancellation
signal is received but _after_ this update was made, the behavior depends
on which state the job is being transitioned into:

  - If the job completed successfully, was cancelled from within, or was
    discarded due to exceeding its max attempts, the job will be updated
    as usual.
  - If the job was snoozed to run again later or encountered a retryable
    error, the job will be marked as cancelled and will not be attempted
    again.

Also expose `JobCancelTx` variant.
  • Loading branch information
bgentry committed Jan 9, 2024
1 parent 6bb58c4 commit f162181
Show file tree
Hide file tree
Showing 14 changed files with 715 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `Cancel` and `CancelTx` to the `Client` to enable cancellation of jobs. [PR #141](https://github.com/riverqueue/river/pull/141).

## [0.0.16] - 2024-01-06

### Changed
Expand Down
94 changes: 94 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,100 @@ func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
}
}

// Cancel cancels the job with the given ID. If possible, the job is cancelled
// immediately and will not be retried. The provided context is used for the
// underlying Postgres update and can be used to cancel the operation or apply a
// timeout.
//
// If the job is still in the queue (available, scheduled, or retryable), it is
// immediately marked as cancelled and will not be retried.
//
// If the job is already finalized (cancelled, completed, or discarded), no
// changes are made.
//
// If the job is currently running, it is not immediately cancelled, but is
// instead marked for cancellation. The client running the job will also be
// notified (via LISTEN/NOTIFY) to cancel the running job's context. Although
// the job's context will be cancelled, since Go does not provide a mechanism to
// interrupt a running goroutine the job will continue running until it returns.
// As always, it is important for workers to respect context cancellation and
// return promptly when the job context is done.
//
// Once the cancellation signal is received by the client running the job, any
// error returned by that job will result in it being cancelled permanently and
// not retried. However if the job returns no error, it will be completed as
// usual.
//
// In the event the running job finishes executing _before_ the cancellation
// signal is received but _after_ this update was made, the behavior depends
// on which state the job is being transitioned into:
//
// - If the job completed successfully, was cancelled from within, or was
// discarded due to exceeding its max attempts, the job will be updated as
// usual.
// - If the job was snoozed to run again later or encountered a retryable error,
// the job will be marked as cancelled and will not be attempted again.
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNoRows if the job doesn't exist.
func (c *Client[TTx]) Cancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancel(ctx, jobID)
if err != nil {
return nil, err
}

return dbsqlc.JobRowFromInternal(job), nil
}

// CancelTx cancels the job with the given ID within the specified transaction.
// This variant lets a caller cancel a job atomically alongside other database
// changes. An cancelled job doesn't take effect until the transaction commits,
// and if the transaction rolls back, so too is the cancelled job.
//
// If possible, the job is cancelled immediately and will not be retried. The
// provided context is used for the underlying Postgres update and can be used
// to cancel the operation or apply a timeout.
//
// If the job is still in the queue (available, scheduled, or retryable), it is
// immediately marked as cancelled and will not be retried.
//
// If the job is already finalized (cancelled, completed, or discarded), no
// changes are made.
//
// If the job is currently running, it is not immediately cancelled, but is
// instead marked for cancellation. The client running the job will also be
// notified (via LISTEN/NOTIFY) to cancel the running job's context. Although
// the job's context will be cancelled, since Go does not provide a mechanism to
// interrupt a running goroutine the job will continue running until it returns.
// As always, it is important for workers to respect context cancellation and
// return promptly when the job context is done.
//
// Once the cancellation signal is received by the client running the job, any
// error returned by that job will result in it being cancelled permanently and
// not retried. However if the job returns no error, it will be completed as
// usual.
//
// In the event the running job finishes executing _before_ the cancellation
// signal is received but _after_ this update was made, the behavior depends
// on which state the job is being transitioned into:
//
// - If the job completed successfully, was cancelled from within, or was
// discarded due to exceeding its max attempts, the job will be updated as
// usual.
// - If the job was snoozed to run again later or encountered a retryable error,
// the job will be marked as cancelled and will not be attempted again.
//
// Returns the up-to-date JobRow for the specified jobID if it exists. Returns
// ErrNoRows if the job doesn't exist.
func (c *Client[TTx]) CancelTx(ctx context.Context, tx TTx, jobID int64) (*rivertype.JobRow, error) {
job, err := c.adapter.JobCancelTx(ctx, c.driver.UnwrapTx(tx), jobID)
if err != nil {
return nil, err
}

return dbsqlc.JobRowFromInternal(job), nil
}

func insertParamsFromArgsAndOptions(args JobArgs, insertOpts *InsertOpts) (*dbadapter.JobInsertParams, error) {
encodedArgs, err := json.Marshal(args)
if err != nil {
Expand Down
120 changes: 118 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/internal/util/valutil"
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -216,7 +217,7 @@ func Test_Client(t *testing.T) {
riverinternaltest.WaitOrTimeout(t, workedChan)
})

t.Run("JobCancel", func(t *testing.T) {
t.Run("JobCancelErrorReturned", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand Down Expand Up @@ -245,7 +246,7 @@ func Test_Client(t *testing.T) {
require.WithinDuration(t, time.Now(), *updatedJob.FinalizedAt, 2*time.Second)
})

t.Run("JobSnooze", func(t *testing.T) {
t.Run("JobSnoozeErrorReturned", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand Down Expand Up @@ -274,6 +275,121 @@ func Test_Client(t *testing.T) {
require.WithinDuration(t, time.Now().Add(15*time.Minute), updatedJob.ScheduledAt, 2*time.Second)
})

// This helper is used to test cancelling a job both _in_ a transaction and
// _outside of_ a transaction. The exact same test logic applies to each case,
// the only difference is a different cancelFunc provided by the specific
// subtest.
cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
client, bundle := setup(t)

jobStartedChan := make(chan int64)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
return ctx.Err()
}))

startClient(ctx, t, client)

insertedJob, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertedJob.ID, startedJobID)

// Cancel the job:
updatedJob, err := cancelFunc(ctx, client, insertedJob.ID)
require.NoError(t, err)
require.NotNil(t, updatedJob)
// Job is still actively running at this point because the query wouldn't
// modify that column for a running job:
require.Equal(t, rivertype.JobStateRunning, updatedJob.State)

event := riverinternaltest.WaitOrTimeout(t, bundle.subscribeChan)
require.Equal(t, EventKindJobCancelled, event.Kind)
require.Equal(t, JobStateCancelled, event.Job.State)
require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second)

jobAfterCancel, err := bundle.queries.JobGetByID(ctx, client.driver.GetDBPool(), insertedJob.ID)
require.NoError(t, err)
require.Equal(t, dbsqlc.JobStateCancelled, jobAfterCancel.State)
require.WithinDuration(t, time.Now(), *jobAfterCancel.FinalizedAt, 2*time.Second)
}

t.Run("CancelRunningJob", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.Cancel(ctx, jobID)
})
})

t.Run("CancelRunningJobInTx", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
var (
job *rivertype.JobRow
err error
)
txErr := pgx.BeginFunc(ctx, client.driver.GetDBPool(), func(tx pgx.Tx) error {
job, err = client.CancelTx(ctx, tx, jobID)
return err
})
require.NoError(t, txErr)
return job, err
})
})

t.Run("CancelScheduledJob", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)

jobStartedChan := make(chan int64)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
return ctx.Err()
}))

startClient(ctx, t, client)

insertedJob, err := client.Insert(ctx, &JobArgs{}, &InsertOpts{ScheduledAt: time.Now().Add(5 * time.Minute)})
require.NoError(t, err)

// Cancel the job:
updatedJob, err := client.Cancel(ctx, insertedJob.ID)
require.NoError(t, err)
require.NotNil(t, updatedJob)
require.Equal(t, rivertype.JobStateCancelled, updatedJob.State)
require.WithinDuration(t, time.Now(), *updatedJob.FinalizedAt, 2*time.Second)
})

t.Run("CancelNonExistentJob", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
startClient(ctx, t, client)

// Cancel an unknown job ID:
jobAfter, err := client.Cancel(ctx, 0)
// TODO(bgentry): do we want to expose a different error type for this
// externally since we don't yet want to expose riverdriver publicly?
require.ErrorIs(t, err, riverdriver.ErrNoRows)
require.Nil(t, jobAfter)
})

t.Run("AlternateSchema", func(t *testing.T) {
t.Parallel()

Expand Down
34 changes: 34 additions & 0 deletions internal/dbadapter/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/riverqueue/river/internal/util/ptrutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/internal/util/valutil"
"github.com/riverqueue/river/riverdriver"
)

// When a job has specified unique options, but has not set the ByState
Expand Down Expand Up @@ -82,6 +83,9 @@ type JobInsertResult struct {
// expedience, but this should be converted to a more stable API if Adapter
// would be exported.
type Adapter interface {
JobCancel(ctx context.Context, id int64) (*dbsqlc.RiverJob, error)
JobCancelTx(ctx context.Context, tx pgx.Tx, id int64) (*dbsqlc.RiverJob, error)

JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error)
JobInsertTx(ctx context.Context, tx pgx.Tx, params *JobInsertParams) (*JobInsertResult, error)

Expand Down Expand Up @@ -154,6 +158,36 @@ func NewStandardAdapter(archetype *baseservice.Archetype, config *StandardAdapte
})
}

func (a *StandardAdapter) JobCancel(ctx context.Context, id int64) (*dbsqlc.RiverJob, error) {
return dbutil.WithTxV(ctx, a.executor, func(ctx context.Context, tx pgx.Tx) (*dbsqlc.RiverJob, error) {
return a.JobCancelTx(ctx, tx, id)
})
}

func (a *StandardAdapter) JobCancelTx(ctx context.Context, tx pgx.Tx, id int64) (*dbsqlc.RiverJob, error) {
ctx, cancel := context.WithTimeout(ctx, a.deadlineTimeout)
defer cancel()

cancelledAt, err := a.TimeNowUTC().MarshalJSON()
if err != nil {
return nil, err
}

job, err := a.queries.JobCancel(ctx, a.executor, dbsqlc.JobCancelParams{
CancelAttemptedAt: cancelledAt,
ID: id,
JobControlTopic: string(notifier.NotificationTopicJobControl),
})
if err == pgx.ErrNoRows {

Check failure on line 181 in internal/dbadapter/db_adapter.go

View workflow job for this annotation

GitHub Actions / lint

comparing with == will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
return nil, riverdriver.ErrNoRows
}
if err != nil {
return nil, err
}

return job, nil
}

func (a *StandardAdapter) JobInsert(ctx context.Context, params *JobInsertParams) (*JobInsertResult, error) {
return dbutil.WithTxV(ctx, a.executor, func(ctx context.Context, tx pgx.Tx) (*JobInsertResult, error) {
return a.JobInsertTx(ctx, tx, params)
Expand Down
Loading

0 comments on commit f162181

Please sign in to comment.