Skip to content

Commit

Permalink
Batch completer + additional completer test suite and benchmarks
Browse files Browse the repository at this point in the history
Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the `riverdriver` interface,
the completer only batches jobs being set to `completed`, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member `AsyncCompleter`, thereby allowing
the `BatchCompleter` to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than `AsyncCompleter` (the one currently in use by River
client), and 10-15x faster than `InlineCompleter`.

    $ go test -bench=. ./internal/jobcompleter
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/jobcompleter
    BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
    BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
    BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
    BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
    BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
    BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
    BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
    BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.
  • Loading branch information
brandur committed Mar 10, 2024
1 parent d9a7fc3 commit 3974581
Show file tree
Hide file tree
Showing 16 changed files with 1,217 additions and 201 deletions.
9 changes: 7 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100)
completer := jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())

client := &Client[TTx]{
completer: completer,
Expand Down Expand Up @@ -600,6 +600,11 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// to shut down prior to closing the monitor.
go c.monitor.Run()

// TODO: Stop completer (and any other services) if Start leaves with an error.
if err := c.completer.Start(ctx); err != nil {
return err
}

// Receives job complete notifications from the completer and distributes
// them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
Expand Down Expand Up @@ -662,7 +667,7 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
//
// TODO: there's a risk here that the completer is stuck on a job that won't
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()
c.completer.Stop()

c.notifier.Stop()
c.queueMaintainer.Stop()
Expand Down
2 changes: 1 addition & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func jobStatisticsFromInternal(stats *jobstats.JobStatistics) *JobStatistics {

// The maximum size of the subscribe channel. Events that would overflow it will
// be dropped.
const subscribeChanSize = 100
const subscribeChanSize = 50_000

// eventSubscription is an active subscription for events being produced by a
// client, created with Client.Subscribe.
Expand Down
Loading

0 comments on commit 3974581

Please sign in to comment.