Skip to content

Commit

Permalink
fix shutdown panics by separating completer shutdown
Browse files Browse the repository at this point in the history
Back in #258 / 702d5b2, the batch completer was added to improve
throughput. As part of that refactor, it was turned into a startstop
service that took a context on start. We took the care to ensure that
the context provided to the completer was _not_ the `fetchCtx`
(cancelled on `Stop()`) but instead was the raw user-provided `ctx`,
specifically to make sure the completer could finish its work even after
fetches were stopped.

This worked well if the whole shutdown process was done with `Stop` /
`StopAndCancel`, but it did not work if the user-provided context was
itself cancelled outside of River. In that scenario, the completer would
immediately begin shutting down upon cancellation, even without waiting
for producers to finish sending it any final jobs that needed to be
recorded. This went unnoticed until #379 / 0e57338 turned this scenario
into a panic instead of a silent misbehavior, which is what was
encountered in #400.

To fix this situation, we need to use Go 1.21's new
`context.WithoutCancel` API to fork the user-provided context so that we
maintain whatever else is stored in there (i.e. so anything used by slog
is still available) but we do not cancel this completer's context
_ever_. The completer will manage its own shutdown when its `Stop()` is
called as part of all of the other client services being stopped in
parallel.
  • Loading branch information
bgentry committed Jun 26, 2024
1 parent 543f369 commit 93532e8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
19 changes: 10 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ type Client[TTx any] struct {
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
Expand Down Expand Up @@ -670,9 +669,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// Each time we start, we need a fresh completer subscribe channel to
// send job completion events on, because the completer will close it
// each time it shuts down.
c.completerSubscribeCh = make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(c.completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(c.completerSubscribeCh)
completerSubscribeCh := make(chan []jobcompleter.CompleterJobUpdated, 10)
c.completer.ResetSubscribeChan(completerSubscribeCh)
c.subscriptionManager.ResetSubscribeChan(completerSubscribeCh)

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
Expand All @@ -694,11 +693,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {

// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
// started with a context that's _not_ cancelled if the user-provided
// context is cancelled. This ensures that even when fetch is cancelled on
// shutdown, the completer is still given a separate opportunity to start
// stopping only after the producers have finished up and returned.
if err := c.completer.Start(context.WithoutCancel(ctx)); err != nil {
stopServicesOnError()
return err
}
Expand Down Expand Up @@ -744,7 +743,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
<-fetchCtx.Done()

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
stopProducers()
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": All producers stopped")

// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
Expand Down
6 changes: 6 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func (p *producer) Start(ctx context.Context) error {
return p.StartWorkContext(ctx, ctx)
}

func (p *producer) Stop() {
p.Logger.Debug(p.Name + ": Stopping")
p.BaseStartStop.Stop()
p.Logger.Debug(p.Name + ": Stop returned")
}

// Start starts the producer. It backgrounds a goroutine which is stopped when
// context is cancelled or Stop is invoked.
//
Expand Down

0 comments on commit 93532e8

Please sign in to comment.