diff --git a/CHANGELOG.md b/CHANGELOG.md index 6859fe15276..fd3f2508508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## main / unreleased +* [ENHANCEMENT] Refactor queueManager into generic queue.Queue [#1796](https://github.com/grafana/tempo/pull/1796) (@Blinkuu) + - **BREAKING CHANGE** Rename `tempo_distributor_forwarder_queue_length` metric to `tempo_distributor_queue_length`. New metric has two custom labels: `name` and `tenant`. + - Deprecated `tempo_distributor_forwarder_pushes_total` metric in favor of `tempo_distributor_queue_pushes_total`. + - Deprecated `tempo_distributor_forwarder_pushes_failures_total` metric in favor of `tempo_distributor_queue_pushes_failures_total`. * [ENHANCEMENT] Filter namespace by cluster in tempo dashboards variables [#1771](https://github.com/grafana/tempo/pull/1771) (@electron0zero) * [ENHANCEMENT] Exit early from sharded search requests [#1742](https://github.com/grafana/tempo/pull/1742) (@electron0zero) * [CHANGE] Identify bloom that could not be retrieved from backend block [#1737](https://github.com/grafana/tempo/pull/1737) (@AlexDHoffer) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index e04653e3bb9..83f8c98b7c3 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -131,7 +131,7 @@ type Distributor struct { generatorClientCfg generator_client.Config generatorsRing ring.ReadRing generatorsPool *ring_client.Pool - generatorForwarder *forwarder + generatorForwarder *generatorForwarder // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter @@ -223,7 +223,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi subservices = append(subservices, d.generatorsPool) - d.generatorForwarder = newForwarder(d.sendToGenerators, o) + d.generatorForwarder = newGeneratorForwarder(logger, reg, d.sendToGenerators, o) subservices = append(subservices, d.generatorForwarder) } diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index c4822345d2c..8c178691940 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -2,18 +2,18 @@ package distributor import ( "context" - "fmt" "sync" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/services" - "github.com/grafana/tempo/modules/overrides" - "github.com/grafana/tempo/pkg/util/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" "go.uber.org/multierr" + + "github.com/grafana/tempo/modules/distributor/queue" + "github.com/grafana/tempo/modules/overrides" ) const ( @@ -25,34 +25,33 @@ var ( metricForwarderPushes = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_forwarder_pushes_total", - Help: "Total number of successful requests queued up for a tenant to the forwarder", + Help: "Total number of successful requests queued up for a tenant to the generatorForwarder. This metric is now deprecated in favor of tempo_distributor_queue_pushes_total.", }, []string{"tenant"}) metricForwarderPushesFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_forwarder_pushes_failures_total", - Help: "Total number of failed pushes to the queue for a tenant to the forwarder", - }, []string{"tenant"}) - metricForwarderQueueLength = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "tempo", - Name: "distributor_forwarder_queue_length", - Help: "Number of queued requests for a tenant", + Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder. This metric is now deprecated in favor of tempo_distributor_queue_pushes_failures_total.", }, []string{"tenant"}) ) type forwardFunc func(ctx context.Context, tenantID string, keys []uint32, traces []*rebatchedTrace) error type request struct { - keys []uint32 - traces []*rebatchedTrace + tenantID string + keys []uint32 + traces []*rebatchedTrace } -// forwarder queues up traces to be sent to the metrics-generators -type forwarder struct { +// generatorForwarder queues up traces to be sent to the metrics-generators +type generatorForwarder struct { services.Service - // per-tenant queue managers - queueManagers map[string]*queueManager - mutex sync.RWMutex + logger log.Logger + reg prometheus.Registerer + + // per-tenant queues + queues map[string]*queue.Queue[*request] + mutex sync.RWMutex forwardFunc forwardFunc @@ -61,9 +60,11 @@ type forwarder struct { shutdown chan interface{} } -func newForwarder(fn forwardFunc, o *overrides.Overrides) *forwarder { - rf := &forwarder{ - queueManagers: make(map[string]*queueManager), +func newGeneratorForwarder(logger log.Logger, reg prometheus.Registerer, fn forwardFunc, o *overrides.Overrides) *generatorForwarder { + rf := &generatorForwarder{ + logger: logger, + reg: reg, + queues: make(map[string]*queue.Queue[*request]), mutex: sync.RWMutex{}, forwardFunc: fn, o: o, @@ -77,25 +78,25 @@ func newForwarder(fn forwardFunc, o *overrides.Overrides) *forwarder { } // SendTraces queues up traces to be sent to the metrics-generators -func (f *forwarder) SendTraces(ctx context.Context, tenantID string, keys []uint32, traces []*rebatchedTrace) { +func (f *generatorForwarder) SendTraces(ctx context.Context, tenantID string, keys []uint32, traces []*rebatchedTrace) { select { case <-f.shutdown: return default: } - qm := f.getOrCreateQueueManager(tenantID) - err := qm.pushToQueue(ctx, &request{keys: keys, traces: traces}) + q := f.getOrCreateQueue(tenantID) + err := q.Push(ctx, &request{tenantID: tenantID, keys: keys, traces: traces}) if err != nil { - level.Error(log.Logger).Log("msg", "failed to push traces to queue", "tenant", tenantID, "err", err) + _ = level.Error(f.logger).Log("msg", "failed to push traces to queue", "tenant", tenantID, "err", err) metricForwarderPushesFailures.WithLabelValues(tenantID).Inc() } metricForwarderPushes.WithLabelValues(tenantID).Inc() } -// getQueueManagerConfig returns queueSize and workerCount for the given tenant -func (f *forwarder) getQueueManagerConfig(tenantID string) (queueSize, workerCount int) { +// getQueueConfig returns queueSize and workerCount for the given tenant +func (f *generatorForwarder) getQueueConfig(tenantID string) (queueSize, workerCount int) { queueSize = f.o.MetricsGeneratorForwarderQueueSize(tenantID) if queueSize == 0 { queueSize = defaultQueueSize @@ -108,32 +109,32 @@ func (f *forwarder) getQueueManagerConfig(tenantID string) (queueSize, workerCou return queueSize, workerCount } -func (f *forwarder) getOrCreateQueueManager(tenantID string) *queueManager { - qm, ok := f.getQueueManager(tenantID) +func (f *generatorForwarder) getOrCreateQueue(tenantID string) *queue.Queue[*request] { + q, ok := f.getQueue(tenantID) if ok { - return qm + return q } f.mutex.Lock() defer f.mutex.Unlock() - queueSize, workerCount := f.getQueueManagerConfig(tenantID) - f.queueManagers[tenantID] = newQueueManager(tenantID, queueSize, workerCount, f.forwardFunc) + queueSize, workerCount := f.getQueueConfig(tenantID) + f.queues[tenantID] = f.createQueueAndStartWorkers(tenantID, queueSize, workerCount) - return f.queueManagers[tenantID] + return f.queues[tenantID] } -func (f *forwarder) getQueueManager(tenantID string) (*queueManager, bool) { +func (f *generatorForwarder) getQueue(tenantID string) (*queue.Queue[*request], bool) { f.mutex.RLock() defer f.mutex.RUnlock() - qm, ok := f.queueManagers[tenantID] - return qm, ok + q, ok := f.queues[tenantID] + return q, ok } // watchOverrides watches the overrides for changes -// and updates the queueManagers accordingly -func (f *forwarder) watchOverrides() { +// and updates the queues accordingly +func (f *generatorForwarder) watchOverrides() { ticker := time.NewTicker(f.overridesInterval) for { @@ -142,21 +143,27 @@ func (f *forwarder) watchOverrides() { f.mutex.Lock() var ( - queueManagersToDelete []*queueManager - queueManagersToAdd []struct { + queuesToDelete []*queue.Queue[*request] + queuesToAdd []struct { tenantID string queueSize, workerCount int } ) - for tenantID, tm := range f.queueManagers { - queueSize, workerCount := f.getQueueManagerConfig(tenantID) + for tenantID, q := range f.queues { + queueSize, workerCount := f.getQueueConfig(tenantID) // if the queue size or worker count has changed, shutdown the queue manager and create a new one - if tm.shouldUpdate(queueSize, workerCount) { - level.Info(log.Logger).Log("msg", "Marking queue manager for update", "tenant", tenantID, - "old_queue_size", tm.queueSize, "new_queue_size", queueSize, "old_worker_count", tm.workerCount, "new_worker_count", workerCount) - queueManagersToDelete = append(queueManagersToDelete, tm) - queueManagersToAdd = append(queueManagersToAdd, struct { + if q.ShouldUpdate(queueSize, workerCount) { + _ = level.Info(f.logger).Log( + "msg", "Marking queue manager for update", + "tenant", tenantID, + "old_queue_size", q.Size(), + "new_queue_size", queueSize, + "old_worker_count", q.WorkerCount(), + "new_worker_count", workerCount, + ) + queuesToDelete = append(queuesToDelete, q) + queuesToAdd = append(queuesToAdd, struct { tenantID string queueSize, workerCount int }{tenantID: tenantID, queueSize: queueSize, workerCount: workerCount}) @@ -165,20 +172,20 @@ func (f *forwarder) watchOverrides() { // Spawn a goroutine to asynchronously shut down queue managers go func() { - for _, qm := range queueManagersToDelete { + for _, q := range queuesToDelete { // shutdown the queue manager // this will block until all workers have finished and the queue is drained - level.Info(log.Logger).Log("msg", "Shutting down queue manager", "tenant", qm.tenantID) - if err := qm.shutdown(); err != nil { - level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", qm.tenantID, "err", err) + _ = level.Info(f.logger).Log("msg", "Shutting down queue manager", "tenant", q.TenantID()) + if err := q.Shutdown(context.Background()); err != nil { + _ = level.Error(f.logger).Log("msg", "error shutting down queue manager", "tenant", q.TenantID(), "err", err) } } }() // Synchronously update queue managers - for _, qm := range queueManagersToAdd { - level.Info(log.Logger).Log("msg", "Updating queue manager", "tenant", qm.tenantID) - f.queueManagers[qm.tenantID] = newQueueManager(qm.tenantID, qm.queueSize, qm.workerCount, f.forwardFunc) + for _, q := range queuesToAdd { + _ = level.Info(f.logger).Log("msg", "Updating queue manager", "tenant", q.tenantID) + f.queues[q.tenantID] = f.createQueueAndStartWorkers(q.tenantID, q.queueSize, q.workerCount) } f.mutex.Unlock() @@ -189,156 +196,47 @@ func (f *forwarder) watchOverrides() { } } -func (f *forwarder) start(_ context.Context) error { +func (f *generatorForwarder) start(_ context.Context) error { go f.watchOverrides() return nil } -func (f *forwarder) stop(_ error) error { +func (f *generatorForwarder) stop(_ error) error { close(f.shutdown) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + var errs []error - for _, tm := range f.queueManagers { - if err := tm.shutdown(); err != nil { + for _, q := range f.queues { + if err := q.Shutdown(ctx); err != nil { errs = append(errs, err) } } return multierr.Combine(errs...) } -// queueManager manages a single tenant's queue -type queueManager struct { - // wg is used to wait for the workers to drain the queue while stopping - wg sync.WaitGroup - - tenantID string - workerCount int - workerAliveCount *atomic.Int32 - queueSize int - reqChan chan *request - fn forwardFunc - workersCloseCh chan struct{} - - readOnly *atomic.Bool -} - -func newQueueManager(tenantID string, queueSize, workerCount int, fn forwardFunc) *queueManager { - m := &queueManager{ - tenantID: tenantID, - workerCount: workerCount, - queueSize: queueSize, - workerAliveCount: atomic.NewInt32(0), - reqChan: make(chan *request, queueSize), - fn: fn, - workersCloseCh: make(chan struct{}), - readOnly: atomic.NewBool(false), - } - - m.startWorkers() - - return m -} - -// pushToQueue a trace to the queue -// if the queue is full, the trace is dropped -func (m *queueManager) pushToQueue(ctx context.Context, req *request) error { - if m.readOnly.Load() { - return fmt.Errorf("queue is read-only") +func (f *generatorForwarder) processFunc(ctx context.Context, data *request) { + if err := f.forwardFunc(ctx, data.tenantID, data.keys, data.traces); err != nil { + _ = level.Warn(f.logger).Log("msg", "failed to forward request to metrics generator", "err", err) } - - select { - case m.reqChan <- req: - metricForwarderQueueLength.WithLabelValues(m.tenantID).Inc() - case <-ctx.Done(): - return fmt.Errorf("failed to pushToQueue traces to tenant %s queue: %w", m.tenantID, ctx.Err()) - default: - // Fail fast if the queue is full - return fmt.Errorf("failed to pushToQueue traces to tenant %s queue: queue is full", m.tenantID) - - } - - return nil } -func (m *queueManager) startWorkers() { - for i := 0; i < m.workerCount; i++ { - m.wg.Add(1) - m.workerAliveCount.Inc() - - go m.worker() - } -} - -func (m *queueManager) worker() { - defer func() { - m.wg.Done() - m.workerAliveCount.Dec() - }() - - for { - select { - case req := <-m.reqChan: - metricForwarderQueueLength.WithLabelValues(m.tenantID).Dec() - m.forwardRequest(context.Background(), req) - default: - // Forces to always trying to pull from the queue before exiting - // This is important during shutdown to ensure that the queue is drained - select { - case req := <-m.reqChan: - metricForwarderQueueLength.WithLabelValues(m.tenantID).Dec() - m.forwardRequest(context.Background(), req) - case <-m.workersCloseCh: - // If the queue isn't empty, force to start the loop from the beginning - if len(m.reqChan) > 0 { - continue - } - return - } - } - } -} - -func (m *queueManager) forwardRequest(ctx context.Context, req *request) { - if err := m.fn(ctx, m.tenantID, req.keys, req.traces); err != nil { - level.Error(log.Logger).Log("msg", "pushing to metrics-generators failed", "err", err) - } -} - -func (m *queueManager) shutdown() error { - // Call to stopWorkers only once - if m.readOnly.CAS(false, true) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - return m.stopWorkers(ctx) - } - - return nil -} - -func (m *queueManager) stopWorkers(ctx context.Context) error { - // Close workersCloseCh and wait for all workers to return - close(m.workersCloseCh) - - doneCh := make(chan struct{}) - go func() { - m.wg.Wait() - close(doneCh) - }() - - select { - case <-ctx.Done(): - return fmt.Errorf("failed to stop tenant %s queueManager: %w", m.tenantID, ctx.Err()) - case <-doneCh: - return nil - } -} +func (f *generatorForwarder) createQueueAndStartWorkers(tenantID string, size, workerCount int) *queue.Queue[*request] { + q := queue.New( + queue.Config{ + Name: "metrics-generator", + TenantID: tenantID, + Size: size, + WorkerCount: workerCount, + }, + f.logger, + f.reg, + f.processFunc, + ) + q.StartWorkers() -// shouldUpdate returns true if the queue size or worker count (alive or total) has changed -func (m *queueManager) shouldUpdate(queueSize, numWorkers int) bool { - // TODO: worker alive count could be 0 and shutting down the queue manager would be impossible - // it'd be better if we were able to spawn new workers instead of just closing the queueManager - // and creating a new one - return m.queueSize != queueSize || m.workerCount != numWorkers || m.workerAliveCount.Load() != int32(numWorkers) + return q } diff --git a/modules/distributor/forwarder_test.go b/modules/distributor/forwarder_test.go index 016aba0492e..9fd324a88f1 100644 --- a/modules/distributor/forwarder_test.go +++ b/modules/distributor/forwarder_test.go @@ -3,21 +3,19 @@ package distributor import ( "context" "flag" - "os" - "path/filepath" "sync" "testing" "time" - "github.com/grafana/dskit/services" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/grafana/tempo/modules/overrides" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" ) const tenantID = "tenant-id" @@ -37,13 +35,18 @@ func TestForwarder(t *testing.T) { require.NoError(t, err) wg := sync.WaitGroup{} - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - assert.Equal(t, tenantID, userID) - assert.Equal(t, keys, k) - assert.Equal(t, rebatchedTraces, traces) - wg.Done() - return nil - }, o) + f := newGeneratorForwarder( + log.NewNopLogger(), + prometheus.NewPedanticRegistry(), + func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { + assert.Equal(t, tenantID, userID) + assert.Equal(t, keys, k) + assert.Equal(t, rebatchedTraces, traces) + wg.Done() + return nil + }, + o, + ) require.NoError(t, f.start(context.Background())) defer func() { require.NoError(t, f.stop(nil)) @@ -56,52 +59,6 @@ func TestForwarder(t *testing.T) { wg.Add(1) f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) wg.Wait() - - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) -} - -func TestForwarder_QueueSizeConfig(t *testing.T) { - // this test sends more traces than the configured queue size - // and asserts that the queue size is equal or less than the configured size - - queueSize := 10 - oCfg := overrides.Limits{ - MetricsGeneratorForwarderQueueSize: queueSize, - MetricsGeneratorForwarderWorkers: 1, - } - oCfg.RegisterFlags(&flag.FlagSet{}) - - id, err := util.HexStringToTraceID("1234567890abcdef") - require.NoError(t, err) - - b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) - require.NoError(t, err) - - o, err := overrides.NewOverrides(oCfg) - require.NoError(t, err) - - shutdownCh := make(chan struct{}) - - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - <-shutdownCh - return nil - }, o) - - require.NoError(t, f.start(context.Background())) - defer func() { - close(shutdownCh) - require.NoError(t, f.stop(nil)) - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) - }() - - // sending more traces than queue size - for i := 0; i < queueSize+2; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - - // queue length is less or equal to the configured queue size - assert.LessOrEqual(t, len(f.queueManagers[tenantID].reqChan), queueSize) } func TestForwarder_shutdown(t *testing.T) { @@ -120,14 +77,19 @@ func TestForwarder_shutdown(t *testing.T) { require.NoError(t, err) signalCh := make(chan struct{}) - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - <-signalCh - - assert.Equal(t, tenantID, userID) - assert.Equal(t, keys, k) - assert.Equal(t, rebatchedTraces, traces) - return nil - }, o) + f := newGeneratorForwarder( + log.NewNopLogger(), + prometheus.NewPedanticRegistry(), + func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { + <-signalCh + + assert.Equal(t, tenantID, userID) + assert.Equal(t, keys, k) + assert.Equal(t, rebatchedTraces, traces) + return nil + }, + o, + ) require.NoError(t, f.start(context.Background())) defer func() { @@ -137,117 +99,9 @@ func TestForwarder_shutdown(t *testing.T) { close(signalCh) }() require.NoError(t, f.stop(nil)) - f.mutex.Lock() - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() }() for i := 0; i < 100; i++ { f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) } } - -func TestForwarder_overrides(t *testing.T) { - overridesReloadInterval := 100 * time.Millisecond - limits := overrides.Limits{} - overridesFile := filepath.Join(t.TempDir(), "overrides.yaml") - - buff, err := yaml.Marshal(map[string]map[string]*overrides.Limits{ - "overrides": { - tenantID: { - MetricsGeneratorForwarderQueueSize: 10, - MetricsGeneratorForwarderWorkers: 1, - }, - }, - }) - require.NoError(t, err) - - err = os.WriteFile(overridesFile, buff, os.ModePerm) - require.NoError(t, err) - - limits.PerTenantOverrideConfig = overridesFile - limits.PerTenantOverridePeriod = model.Duration(overridesReloadInterval) - - id, err := util.HexStringToTraceID("1234567890abcdef") - require.NoError(t, err) - - b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) - require.NoError(t, err) - - o, err := overrides.NewOverrides(limits) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.TODO(), o)) - - signalCh := make(chan struct{}) - wg := sync.WaitGroup{} - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - wg.Done() - <-signalCh - return nil - }, o) - f.overridesInterval = overridesReloadInterval - - require.NoError(t, f.start(context.Background())) - defer func() { - close(signalCh) - require.NoError(t, f.stop(nil)) - f.mutex.Lock() - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() - }() - - wg.Add(1) - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - wg.Wait() - - // 10 pushes are buffered, 10 are discarded - for i := 0; i < 20; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - - // queue is full with 10 items - f.mutex.Lock() - assert.Equal(t, 10, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() - wg.Add(10) - - buff, err = yaml.Marshal(map[string]map[string]*overrides.Limits{ - "overrides": { - tenantID: { - MetricsGeneratorForwarderQueueSize: 20, - MetricsGeneratorForwarderWorkers: 2, - }, - }, - }) - require.NoError(t, err) - - err = os.WriteFile(overridesFile, buff, os.ModePerm) - require.NoError(t, err) - - // Wait triple the reload interval to ensure overrides are updated (overrides interval + forwarder interval) - time.Sleep(3 * overridesReloadInterval) - - // Allow for pending requests to be processed so queueManager can be closed and a new one created - for i := 0; i < 11; i++ { - signalCh <- struct{}{} - } - wg.Wait() - - wg.Add(2) - for i := 0; i < 2; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - wg.Wait() - - // 20 pushes are buffered, 10 are discarded - for i := 0; i < 30; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - - // queue is full with 20 items - f.mutex.Lock() - assert.Equal(t, 20, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() - wg.Add(20) -} diff --git a/modules/distributor/queue/config.go b/modules/distributor/queue/config.go new file mode 100644 index 00000000000..a21a5384617 --- /dev/null +++ b/modules/distributor/queue/config.go @@ -0,0 +1,8 @@ +package queue + +type Config struct { + Name string + TenantID string + Size int + WorkerCount int +} diff --git a/modules/distributor/queue/queue.go b/modules/distributor/queue/queue.go new file mode 100644 index 00000000000..ff5b48cced9 --- /dev/null +++ b/modules/distributor/queue/queue.go @@ -0,0 +1,195 @@ +package queue + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/user" +) + +var ( + pushesTotalMetrics = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_pushes_total", + Help: "Total number of successful requests queued up for a tenant to the generatorForwarder", + }, []string{"name", "tenant"}) + pushesFailuresTotalMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_pushes_failures_total", + Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder", + }, []string{"name", "tenant"}) + lengthMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_length", + Help: "Number of queued requests for a tenant", + }, []string{"name", "tenant"}) +) + +type ProcessFunc[T any] func(ctx context.Context, data T) + +// Queue represents a single tenant's queue. +type Queue[T any] struct { + // wg is used to wait for the workers to drain the queue while stopping + wg sync.WaitGroup + + logger log.Logger + + name string + tenantID string + workerCount int + size int + reqChan chan T + fn ProcessFunc[T] + workersCloseCh chan struct{} + + pushesTotalMetrics *prometheus.CounterVec + pushesFailuresTotalMetrics *prometheus.CounterVec + lengthMetric *prometheus.GaugeVec + + readOnly *atomic.Bool +} + +func New[T any](cfg Config, logger log.Logger, _ prometheus.Registerer, fn ProcessFunc[T]) *Queue[T] { + return &Queue[T]{ + logger: logger, + name: cfg.Name, + tenantID: cfg.TenantID, + workerCount: cfg.WorkerCount, + size: cfg.Size, + reqChan: make(chan T, cfg.Size), + fn: fn, + workersCloseCh: make(chan struct{}), + pushesTotalMetrics: pushesTotalMetrics, + pushesFailuresTotalMetrics: pushesFailuresTotalMetric, + lengthMetric: lengthMetric, + readOnly: &atomic.Bool{}, + } +} + +// Push pushes data onto a queue. +// If the queue is full, the data is dropped +func (m *Queue[T]) Push(ctx context.Context, data T) error { + if m.readOnly.Load() { + return fmt.Errorf("queue is read-only") + } + + m.pushesTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() + + select { + case <-ctx.Done(): + m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() + return fmt.Errorf("failed to push data to queue for tenant=%s and queue_name=%s: %w", m.tenantID, m.name, ctx.Err()) + default: + } + + select { + case m.reqChan <- data: + m.lengthMetric.WithLabelValues(m.name, m.tenantID).Inc() + return nil + default: + } + + // Fail fast if the queue is full + m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() + + return fmt.Errorf("failed to push data to queue for tenant=%s and queue_name=%s: queue is full", m.tenantID, m.name) +} + +func (m *Queue[T]) StartWorkers() { + for i := 0; i < m.workerCount; i++ { + m.wg.Add(1) + + go m.worker() + } +} + +// TenantID returns the tenant id. +func (m *Queue[T]) TenantID() string { + return m.tenantID +} + +// Size returns the size of the queue. +func (m *Queue[T]) Size() int { + return m.size +} + +// WorkerCount returns the number of expected workers. +func (m *Queue[T]) WorkerCount() int { + return m.workerCount +} + +// ShouldUpdate returns true if the queue size or worker count (alive or total) has changed +func (m *Queue[T]) ShouldUpdate(size, workerCount int) bool { + return m.size != size || m.workerCount != workerCount +} + +func (m *Queue[T]) Shutdown(ctx context.Context) error { + // Call to stopWorkers only once + if m.readOnly.CompareAndSwap(false, true) { + return m.stopWorkers(ctx) + } + + return nil +} + +func (m *Queue[T]) worker() { + defer m.wg.Done() + + for { + select { + case req := <-m.reqChan: + m.lengthMetric.WithLabelValues(m.name, m.tenantID).Dec() + m.forwardRequest(req) + case <-m.workersCloseCh: + // Forces to always trying to pull from the queue before exiting + // This is important during shutdown to ensure that the queue is drained + select { + case req, ok := <-m.reqChan: + if !ok { //closed + return + } + + m.lengthMetric.WithLabelValues(m.name, m.tenantID).Dec() + m.forwardRequest(req) + default: + return + } + } + } +} + +func (m *Queue[T]) forwardRequest(req T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + ctx = user.InjectOrgID(ctx, m.tenantID) + + m.fn(ctx, req) +} + +func (m *Queue[T]) stopWorkers(ctx context.Context) error { + // Close workersCloseCh and wait for all workers to return + close(m.workersCloseCh) + + doneCh := make(chan struct{}) + go func() { + m.wg.Wait() + close(doneCh) + }() + + select { + case <-ctx.Done(): + return fmt.Errorf("failed to stop tenant %s Queue: %w", m.tenantID, ctx.Err()) + case <-doneCh: + return nil + } +} diff --git a/modules/distributor/queue/queue_test.go b/modules/distributor/queue/queue_test.go new file mode 100644 index 00000000000..75558a2e06a --- /dev/null +++ b/modules/distributor/queue/queue_test.go @@ -0,0 +1,242 @@ +package queue + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func newQueue[T any](t *testing.T, size, workerCount int, processFunc ProcessFunc[T]) *Queue[T] { + cfg := Config{Name: "testName", TenantID: "testTenantID", Size: size, WorkerCount: workerCount} + + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + q := New(cfg, logger, reg, processFunc) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + require.NoError(t, q.Shutdown(ctx)) + + // Metrics are defined on package-level, we need to reset them each time. + pushesTotalMetrics.Reset() + pushesFailuresTotalMetric.Reset() + lengthMetric.Reset() + }) + + return q +} + +func newStartedQueue[T any](t *testing.T, size, workerCount int, processFunc ProcessFunc[T]) *Queue[T] { + q := newQueue(t, size, workerCount, processFunc) + q.StartWorkers() + + return q +} + +func getCounterValue(metric *prometheus.CounterVec) float64 { + var m = &dto.Metric{} + if err := metric.WithLabelValues("testName", "testTenantID").Write(m); err != nil { + return 0 + } + + return m.Counter.GetValue() +} + +func TestNew_ReturnsNotNilAndSetsCorrectFieldsFromConfig(t *testing.T) { + // Given + cfg := Config{Name: "testName", TenantID: "testTenantID", Size: 123, WorkerCount: 321} + processFunc := func(context.Context, int) {} + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + + // When + got := New(cfg, logger, reg, processFunc) + + // Then + require.NotNil(t, got) + require.Equal(t, got.name, cfg.Name) + require.Equal(t, got.tenantID, cfg.TenantID) + require.Equal(t, got.size, cfg.Size) + require.Equal(t, got.workerCount, cfg.WorkerCount) +} + +func TestQueue_Push_ReturnsNoErrorAndWorkersInvokeProcessFuncCorrectNumberOfTimesWithRunningWorkers(t *testing.T) { + // Given + count := atomic.NewUint32(0) + wg := sync.WaitGroup{} + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) { + defer wg.Done() + count.Inc() + } + q := newStartedQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size-3; i++ { + wg.Add(1) + require.NoError(t, q.Push(context.Background(), nil)) + } + + // Then + wg.Wait() + require.Equal(t, uint32(size-3), count.Load()) + require.Equal(t, float64(size-3), getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsNoErrorWhenPushingLessItemsThanSizeWithStoppedWorkers(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) {} + q := newQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size-3; i++ { + require.NoError(t, q.Push(context.Background(), nil)) + } + + // Then + require.Equal(t, size-3, len(q.reqChan)) + require.Equal(t, float64(size-3), getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsErrorWhenPushingItemsToShutdownQueue(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) {} + q := newStartedQueue(t, size, workerCount, processFunc) + require.NoError(t, q.Shutdown(context.Background())) + + // When + err := q.Push(context.Background(), nil) + + // Then + require.Error(t, err) + require.Zero(t, len(q.reqChan)) + require.Zero(t, getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_QueueGetsProperlyDrainedOnShutdown(t *testing.T) { + // Given + count := atomic.NewUint32(0) + wg := sync.WaitGroup{} + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) { + defer wg.Done() + count.Inc() + } + q := newQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size-3; i++ { + wg.Add(1) + require.NoError(t, q.Push(context.Background(), nil)) + } + + require.NoError(t, q.Shutdown(context.Background())) + q.StartWorkers() + + // Then + wg.Wait() + require.Zero(t, len(q.reqChan)) + require.Equal(t, float64(size-3), getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsErrorWhenPushingItemsWithCancelledContext(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) {} + q := newStartedQueue(t, size, workerCount, processFunc) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // When + err := q.Push(ctx, nil) + + // Then + require.Error(t, err) + require.Zero(t, len(q.reqChan)) + require.Equal(t, float64(1), getCounterValue(q.pushesTotalMetrics)) + require.Equal(t, float64(1), getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsErrorWhenPushingItemsToFullQueueWithStoppedWorkers(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) {} + q := newQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size; i++ { + require.NoError(t, q.Push(context.Background(), nil)) + } + + require.Error(t, q.Push(context.Background(), nil)) + + // Then + require.Equal(t, size, len(q.reqChan)) + require.Equal(t, float64(size+1), getCounterValue(q.pushesTotalMetrics)) + require.Equal(t, float64(1), getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_ShouldUpdate_ReturnsTrueWhenWorkerCountDiffersFromOriginalValue(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(2, 7) + + // Then + require.True(t, got) +} + +func TestQueue_ShouldUpdate_ReturnsTrueWhenSizeDiffersFromOriginalValue(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(7, 3) + + // Then + require.True(t, got) +} + +func TestQueue_ShouldUpdate_ReturnsTrueWhenBothParametersDifferFromOriginalValue(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(13, 17) + + // Then + require.True(t, got) +} + +func TestQueue_ShouldUpdate_ReturnsFalseWhenBothParametersEqualOriginalValues(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(2, 3) + + // Then + require.False(t, got) +}