Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Derive all internal contexts from Client context #514

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Derive all internal contexts from user-provided `Client` context. This includes the job fetch context, notifier unlisten, and completer. [PR #514](https://github.com/riverqueue/river/pull/514).

## [0.11.1] - 2024-08-05

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) {
const numRetries = 3

func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseService, disableSleep bool, retryFunc func(ctx context.Context) (T, error)) (T, error) {
uncancelledCtx := context.Background()
uncancelledCtx := context.WithoutCancel(logCtx)

var (
defaultVal T
Expand Down
4 changes: 2 additions & 2 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Subscription struct {

func (s *Subscription) Unlisten(ctx context.Context) {
s.unlistenOnce.Do(func() {
// Unlisten uses background context in case of cancellation.
if err := s.notifier.unlisten(context.Background(), s); err != nil { //nolint:contextcheck
// Unlisten strips cancellation from the parent context to ensure it runs:
if err := s.notifier.unlisten(context.WithoutCancel(ctx), s); err != nil { //nolint:contextcheck
s.notifier.Logger.ErrorContext(ctx, s.notifier.Name+": Error unlistening on topic", "err", err, "topic", s.topic)
}
})
Expand Down
19 changes: 10 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit

func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
limit := p.maxJobsToFetch()
go p.dispatchWork(limit, fetchResultCh) //nolint:contextcheck
go p.dispatchWork(workCtx, limit, fetchResultCh)

for {
select {
Expand Down Expand Up @@ -509,14 +509,15 @@ func (p *producer) maybeCancelJob(id int64) {
executor.Cancel()
}

func (p *producer) dispatchWork(count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally uses a background context because we don't want it to
// get cancelled if the producer is asked to shut down. In that situation, we
// want to finish fetching any jobs we are in the midst of fetching, work
// them, and then stop. Otherwise we'd have a risk of shutting down when we
// had already fetched jobs in the database, leaving those jobs stranded. We'd
// then potentially have to release them back to the queue.
jobs, err := p.exec.JobGetAvailable(context.Background(), &riverdriver.JobGetAvailableParams{
func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) {
// This intentionally removes any deadlines or cancelleation from the parent
// context because we don't want it to get cancelled if the producer is asked
// to shut down. In that situation, we want to finish fetching any jobs we are
// in the midst of fetching, work them, and then stop. Otherwise we'd have a
// risk of shutting down when we had already fetched jobs in the database,
// leaving those jobs stranded. We'd then potentially have to release them
// back to the queue.
jobs, err := p.exec.JobGetAvailable(context.WithoutCancel(workCtx), &riverdriver.JobGetAvailableParams{
AttemptedBy: p.config.ClientID,
Max: count,
Queue: p.config.Queue,
Expand Down
Loading