Replies: 4 comments 3 replies
-
I've ended up with the following interface for the time being to enable instrumenting enqueues and propagating tracing information package riverutil
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/riverqueue/river/rivertype"
)
// Ensure *river.Client satisfies this interface at compile time.
var _ EnqueueClient[pgx.Tx] = (*river.Client[pgx.Tx])(nil)
// EnqueueClient defines the subset of the *river.Client interface which is
// available when initialised without a `*pgxpool.Pool`.
type EnqueueClient[TTx any] interface {
InsertTx(context.Context, TTx, river.JobArgs, *river.InsertOpts) (*rivertype.JobRow, error)
InsertManyTx(context.Context, TTx, []river.InsertManyParams) (int64, error)
} And the current implementation for instrumented enqueues (doesn't create any new spans, but propagates the SpanContext through, so the work can know where the job came from and correlate through): package olly
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/CGA1123/riverplayground/riverutil"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivertype"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
type enqueueClient struct {
river *river.Client[pgx.Tx]
}
func (ec *enqueueClient) InsertTx(ctx context.Context, tx pgx.Tx, j river.JobArgs, opts *river.InsertOpts) (*rivertype.JobRow, error) {
opts = propagateRiverTrace(ctx, j, opts)
row, err := ec.river.InsertTx(ctx, tx, j, opts)
if err != nil {
return row, err
}
return row, err
}
func (ec *enqueueClient) InsertManyTx(ctx context.Context, tx pgx.Tx, jobs []river.InsertManyParams) (int64, error) {
count, err := ec.river.InsertManyTx(ctx, tx, jobs)
if err != nil {
return count, err
}
for _, j := range jobs {
j.InsertOpts = propagateRiverTrace(ctx, j.Args, j.InsertOpts)
}
return count, err
}
func propagateRiverTrace(ctx context.Context, j river.JobArgs, opts *river.InsertOpts) *river.InsertOpts {
if opts == nil {
opts = &river.InsertOpts{}
}
var tags []string
if argsWithOpts, ok := j.(river.JobArgsWithInsertOpts); ok {
tags = argsWithOpts.InsertOpts().Tags
}
c := propagation.MapCarrier(map[string]string{})
otel.GetTextMapPropagator().Inject(ctx, c)
ollyTags := make([]string, 0, len(c))
for k, v := range c {
ollyTags = append(ollyTags, fmt.Sprintf("%s%s:%s", riverTagPrefix, k, v))
}
// This replicates the behaviour of `river.insertParamsFromArgsAndOptions`
if opts.Tags == nil {
opts.Tags = append(tags, ollyTags...)
} else {
opts.Tags = append(opts.Tags, ollyTags...)
}
return opts
}
// EnqueueClient returns a wrapped `*river.Client` with a reduced set of
// methods exposed.
//
// It includes additional observability and propagates relevant metadata at
// enqueue-time.
//
// This function will panic if `river.NewClient` returns an error.
func EnqueueClient(ctx context.Context) riverutil.EnqueueClient[pgx.Tx] {
c, err := river.NewClient(riverpgxv5.New(nil), &river.Config{})
if err != nil {
panic(err)
}
return &enqueueClient{river: c}
} |
Beta Was this translation helpful? Give feedback.
-
Hi @CGA1123, thanks for the thoughtful context. We're definitely going to need an answer for this kind of thing (I wanted this myself), even if it's just documentation around a recommendation to implement your own wrapper around the River client like you've done above. Regarding middleware: it's subjective, but I've always felt like this was a little bit of a leaky abstraction when applied to non-HTTP stacks like its inclusion in projects like Excon. Even when applied to HTTP it's got some non-ideal aspects too — for example, every middleware adds a new frame to the call stack, which can make call stacks painful to read. At places I've worked in the past, if an API request raised an exception, the stack was hundreds of frames deep (because of hundreds of middleware) and you had to scroll back up many, many pages even to see the original exception message. I'd be tempted to try something like a "hooks" interface that allows functionality to be plugged in at various points, but I'd have to think about it more to make sure that it's plausible. |
Beta Was this translation helpful? Give feedback.
-
I would love to see some kind of middleware solution within river. For our use case, we're trying to use sentry to add more observability into the river job execution and to capture exceptions/errors when they occur. The existing We're ultimately moving in the direction of building a wrapper library/interface for job execution that can instantiate a Sentry hub/scope tied to the context at the start of the job and also capture errors/telemetry at the end of the job. This is what our solution looks like:
|
Beta Was this translation helpful? Give feedback.
-
Middleware has been added for job insertion and workers as part of #632 and was just released in v0.13.0-rc.1. For now I'm marking this discussion as closed but if anybody tries it and it doesn't do what they need, please start a new discussion or issue 🙏 🚀 |
Beta Was this translation helpful? Give feedback.
-
I'm integration
river
into a project and would love to be able to have some kind of a middleware pattern whereby I can inject metadata such as trace IDs, correlation IDs, etc...I'm looking for something akin to the
ClientMiddleware
available in Sidekiq, which allows for access to the job object before persistence. The equivalent forServerMiddleware
is more straightforward, as wrapping theWorker
interface is easily done!I think for the time being I'll create a smaller
Client
interface forriver
that I propagate through my application which I can then use to wrap theInsert
operations with custom logic that have access to thecontext
in a uniform manner.It might be nice for that to be something that can be configured directly on a
*river.Client
as a middleware stack?Would be great to hear thoughts about how others are approaching this problem, and views around this from the maintainers as well! Thank you 😄
Beta Was this translation helpful? Give feedback.
All reactions