From 53d9c48a91b11a11ecc647e641c48c2ff3619320 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Wed, 12 Oct 2022 10:53:36 +0200 Subject: [PATCH 01/12] Refactor queueManager into generic queue.Queue --- modules/distributor/distributor.go | 4 +- modules/distributor/forwarder.go | 299 +++++++++--------------- modules/distributor/forwarder_test.go | 206 +++------------- modules/distributor/queue/config.go | 8 + modules/distributor/queue/queue.go | 224 ++++++++++++++++++ modules/distributor/queue/queue_test.go | 283 ++++++++++++++++++++++ 6 files changed, 652 insertions(+), 372 deletions(-) create mode 100644 modules/distributor/queue/config.go create mode 100644 modules/distributor/queue/queue.go create mode 100644 modules/distributor/queue/queue_test.go diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index e04653e3bb9..83f8c98b7c3 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -131,7 +131,7 @@ type Distributor struct { generatorClientCfg generator_client.Config generatorsRing ring.ReadRing generatorsPool *ring_client.Pool - generatorForwarder *forwarder + generatorForwarder *generatorForwarder // Per-user rate limiter. ingestionRateLimiter *limiter.RateLimiter @@ -223,7 +223,7 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi subservices = append(subservices, d.generatorsPool) - d.generatorForwarder = newForwarder(d.sendToGenerators, o) + d.generatorForwarder = newGeneratorForwarder(logger, reg, d.sendToGenerators, o) subservices = append(subservices, d.generatorForwarder) } diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index c4822345d2c..82d7573ffa1 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -2,18 +2,18 @@ package distributor import ( "context" - "fmt" "sync" "time" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/services" - "github.com/grafana/tempo/modules/overrides" - "github.com/grafana/tempo/pkg/util/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "go.uber.org/atomic" "go.uber.org/multierr" + + "github.com/grafana/tempo/modules/distributor/queue" + "github.com/grafana/tempo/modules/overrides" ) const ( @@ -25,34 +25,33 @@ var ( metricForwarderPushes = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_forwarder_pushes_total", - Help: "Total number of successful requests queued up for a tenant to the forwarder", + Help: "Total number of successful requests queued up for a tenant to the generatorForwarder", }, []string{"tenant"}) metricForwarderPushesFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_forwarder_pushes_failures_total", - Help: "Total number of failed pushes to the queue for a tenant to the forwarder", - }, []string{"tenant"}) - metricForwarderQueueLength = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "tempo", - Name: "distributor_forwarder_queue_length", - Help: "Number of queued requests for a tenant", + Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder", }, []string{"tenant"}) ) type forwardFunc func(ctx context.Context, tenantID string, keys []uint32, traces []*rebatchedTrace) error type request struct { - keys []uint32 - traces []*rebatchedTrace + tenantID string + keys []uint32 + traces []*rebatchedTrace } -// forwarder queues up traces to be sent to the metrics-generators -type forwarder struct { +// generatorForwarder queues up traces to be sent to the metrics-generators +type generatorForwarder struct { services.Service - // per-tenant queue managers - queueManagers map[string]*queueManager - mutex sync.RWMutex + logger log.Logger + reg prometheus.Registerer + + // per-tenant queues + queues map[string]*queue.Queue[*request] + mutex sync.RWMutex forwardFunc forwardFunc @@ -61,9 +60,11 @@ type forwarder struct { shutdown chan interface{} } -func newForwarder(fn forwardFunc, o *overrides.Overrides) *forwarder { - rf := &forwarder{ - queueManagers: make(map[string]*queueManager), +func newGeneratorForwarder(logger log.Logger, reg prometheus.Registerer, fn forwardFunc, o *overrides.Overrides) *generatorForwarder { + rf := &generatorForwarder{ + logger: logger, + reg: reg, + queues: make(map[string]*queue.Queue[*request]), mutex: sync.RWMutex{}, forwardFunc: fn, o: o, @@ -77,25 +78,25 @@ func newForwarder(fn forwardFunc, o *overrides.Overrides) *forwarder { } // SendTraces queues up traces to be sent to the metrics-generators -func (f *forwarder) SendTraces(ctx context.Context, tenantID string, keys []uint32, traces []*rebatchedTrace) { +func (f *generatorForwarder) SendTraces(ctx context.Context, tenantID string, keys []uint32, traces []*rebatchedTrace) { select { case <-f.shutdown: return default: } - qm := f.getOrCreateQueueManager(tenantID) - err := qm.pushToQueue(ctx, &request{keys: keys, traces: traces}) + q := f.getOrCreateQueue(tenantID) + err := q.Push(ctx, &request{tenantID: tenantID, keys: keys, traces: traces}) if err != nil { - level.Error(log.Logger).Log("msg", "failed to push traces to queue", "tenant", tenantID, "err", err) + _ = level.Error(f.logger).Log("msg", "failed to push traces to queue", "tenant", tenantID, "err", err) metricForwarderPushesFailures.WithLabelValues(tenantID).Inc() } metricForwarderPushes.WithLabelValues(tenantID).Inc() } -// getQueueManagerConfig returns queueSize and workerCount for the given tenant -func (f *forwarder) getQueueManagerConfig(tenantID string) (queueSize, workerCount int) { +// getQueueConfig returns queueSize and workerCount for the given tenant +func (f *generatorForwarder) getQueueConfig(tenantID string) (queueSize, workerCount int) { queueSize = f.o.MetricsGeneratorForwarderQueueSize(tenantID) if queueSize == 0 { queueSize = defaultQueueSize @@ -108,32 +109,50 @@ func (f *forwarder) getQueueManagerConfig(tenantID string) (queueSize, workerCou return queueSize, workerCount } -func (f *forwarder) getOrCreateQueueManager(tenantID string) *queueManager { - qm, ok := f.getQueueManager(tenantID) +func (f *generatorForwarder) getOrCreateQueue(tenantID string) *queue.Queue[*request] { + q, ok := f.getQueue(tenantID) if ok { - return qm + return q } f.mutex.Lock() defer f.mutex.Unlock() - queueSize, workerCount := f.getQueueManagerConfig(tenantID) - f.queueManagers[tenantID] = newQueueManager(tenantID, queueSize, workerCount, f.forwardFunc) + queueSize, workerCount := f.getQueueConfig(tenantID) + + processFunc := func(ctx context.Context, data *request) error { + return f.forwardFunc(ctx, data.tenantID, data.keys, data.traces) + } + + newQueue := queue.New( + queue.Config{ + Name: "metrics-generator", + TenantID: tenantID, + Size: queueSize, + WorkerCount: workerCount, + }, + f.logger, + f.reg, + processFunc, + ) + newQueue.StartWorkers() + + f.queues[tenantID] = newQueue - return f.queueManagers[tenantID] + return f.queues[tenantID] } -func (f *forwarder) getQueueManager(tenantID string) (*queueManager, bool) { +func (f *generatorForwarder) getQueue(tenantID string) (*queue.Queue[*request], bool) { f.mutex.RLock() defer f.mutex.RUnlock() - qm, ok := f.queueManagers[tenantID] - return qm, ok + q, ok := f.queues[tenantID] + return q, ok } // watchOverrides watches the overrides for changes -// and updates the queueManagers accordingly -func (f *forwarder) watchOverrides() { +// and updates the queues accordingly +func (f *generatorForwarder) watchOverrides() { ticker := time.NewTicker(f.overridesInterval) for { @@ -142,21 +161,27 @@ func (f *forwarder) watchOverrides() { f.mutex.Lock() var ( - queueManagersToDelete []*queueManager - queueManagersToAdd []struct { + queuesToDelete []*queue.Queue[*request] + queuesToAdd []struct { tenantID string queueSize, workerCount int } ) - for tenantID, tm := range f.queueManagers { - queueSize, workerCount := f.getQueueManagerConfig(tenantID) + for tenantID, q := range f.queues { + queueSize, workerCount := f.getQueueConfig(tenantID) // if the queue size or worker count has changed, shutdown the queue manager and create a new one - if tm.shouldUpdate(queueSize, workerCount) { - level.Info(log.Logger).Log("msg", "Marking queue manager for update", "tenant", tenantID, - "old_queue_size", tm.queueSize, "new_queue_size", queueSize, "old_worker_count", tm.workerCount, "new_worker_count", workerCount) - queueManagersToDelete = append(queueManagersToDelete, tm) - queueManagersToAdd = append(queueManagersToAdd, struct { + if q.ShouldUpdate(queueSize, workerCount) { + _ = level.Info(f.logger).Log( + "msg", "Marking queue manager for update", + "tenant", tenantID, + "old_queue_size", q.Size(), + "new_queue_size", queueSize, + "old_worker_count", q.WorkerCount(), + "new_worker_count", workerCount, + ) + queuesToDelete = append(queuesToDelete, q) + queuesToAdd = append(queuesToAdd, struct { tenantID string queueSize, workerCount int }{tenantID: tenantID, queueSize: queueSize, workerCount: workerCount}) @@ -165,20 +190,38 @@ func (f *forwarder) watchOverrides() { // Spawn a goroutine to asynchronously shut down queue managers go func() { - for _, qm := range queueManagersToDelete { + for _, q := range queuesToDelete { // shutdown the queue manager // this will block until all workers have finished and the queue is drained - level.Info(log.Logger).Log("msg", "Shutting down queue manager", "tenant", qm.tenantID) - if err := qm.shutdown(); err != nil { - level.Error(log.Logger).Log("msg", "error shutting down queue manager", "tenant", qm.tenantID, "err", err) + _ = level.Info(f.logger).Log("msg", "Shutting down queue manager", "tenant", q.TenantID()) + if err := q.Shutdown(context.Background()); err != nil { + _ = level.Error(f.logger).Log("msg", "error shutting down queue manager", "tenant", q.TenantID(), "err", err) } } }() // Synchronously update queue managers - for _, qm := range queueManagersToAdd { - level.Info(log.Logger).Log("msg", "Updating queue manager", "tenant", qm.tenantID) - f.queueManagers[qm.tenantID] = newQueueManager(qm.tenantID, qm.queueSize, qm.workerCount, f.forwardFunc) + for _, q := range queuesToAdd { + _ = level.Info(f.logger).Log("msg", "Updating queue manager", "tenant", q.tenantID) + + processFunc := func(ctx context.Context, data *request) error { + return f.forwardFunc(ctx, data.tenantID, data.keys, data.traces) + } + + newQueue := queue.New( + queue.Config{ + Name: "metrics-generator", + TenantID: q.tenantID, + Size: q.queueSize, + WorkerCount: q.workerCount, + }, + f.logger, + f.reg, + processFunc, + ) + newQueue.StartWorkers() + + f.queues[q.tenantID] = newQueue } f.mutex.Unlock() @@ -189,156 +232,24 @@ func (f *forwarder) watchOverrides() { } } -func (f *forwarder) start(_ context.Context) error { +func (f *generatorForwarder) start(_ context.Context) error { go f.watchOverrides() return nil } -func (f *forwarder) stop(_ error) error { +func (f *generatorForwarder) stop(_ error) error { close(f.shutdown) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + var errs []error - for _, tm := range f.queueManagers { - if err := tm.shutdown(); err != nil { + for _, q := range f.queues { + if err := q.Shutdown(ctx); err != nil { errs = append(errs, err) } } return multierr.Combine(errs...) } - -// queueManager manages a single tenant's queue -type queueManager struct { - // wg is used to wait for the workers to drain the queue while stopping - wg sync.WaitGroup - - tenantID string - workerCount int - workerAliveCount *atomic.Int32 - queueSize int - reqChan chan *request - fn forwardFunc - workersCloseCh chan struct{} - - readOnly *atomic.Bool -} - -func newQueueManager(tenantID string, queueSize, workerCount int, fn forwardFunc) *queueManager { - m := &queueManager{ - tenantID: tenantID, - workerCount: workerCount, - queueSize: queueSize, - workerAliveCount: atomic.NewInt32(0), - reqChan: make(chan *request, queueSize), - fn: fn, - workersCloseCh: make(chan struct{}), - readOnly: atomic.NewBool(false), - } - - m.startWorkers() - - return m -} - -// pushToQueue a trace to the queue -// if the queue is full, the trace is dropped -func (m *queueManager) pushToQueue(ctx context.Context, req *request) error { - if m.readOnly.Load() { - return fmt.Errorf("queue is read-only") - } - - select { - case m.reqChan <- req: - metricForwarderQueueLength.WithLabelValues(m.tenantID).Inc() - case <-ctx.Done(): - return fmt.Errorf("failed to pushToQueue traces to tenant %s queue: %w", m.tenantID, ctx.Err()) - default: - // Fail fast if the queue is full - return fmt.Errorf("failed to pushToQueue traces to tenant %s queue: queue is full", m.tenantID) - - } - - return nil -} - -func (m *queueManager) startWorkers() { - for i := 0; i < m.workerCount; i++ { - m.wg.Add(1) - m.workerAliveCount.Inc() - - go m.worker() - } -} - -func (m *queueManager) worker() { - defer func() { - m.wg.Done() - m.workerAliveCount.Dec() - }() - - for { - select { - case req := <-m.reqChan: - metricForwarderQueueLength.WithLabelValues(m.tenantID).Dec() - m.forwardRequest(context.Background(), req) - default: - // Forces to always trying to pull from the queue before exiting - // This is important during shutdown to ensure that the queue is drained - select { - case req := <-m.reqChan: - metricForwarderQueueLength.WithLabelValues(m.tenantID).Dec() - m.forwardRequest(context.Background(), req) - case <-m.workersCloseCh: - // If the queue isn't empty, force to start the loop from the beginning - if len(m.reqChan) > 0 { - continue - } - return - } - } - } -} - -func (m *queueManager) forwardRequest(ctx context.Context, req *request) { - if err := m.fn(ctx, m.tenantID, req.keys, req.traces); err != nil { - level.Error(log.Logger).Log("msg", "pushing to metrics-generators failed", "err", err) - } -} - -func (m *queueManager) shutdown() error { - // Call to stopWorkers only once - if m.readOnly.CAS(false, true) { - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) - defer cancel() - - return m.stopWorkers(ctx) - } - - return nil -} - -func (m *queueManager) stopWorkers(ctx context.Context) error { - // Close workersCloseCh and wait for all workers to return - close(m.workersCloseCh) - - doneCh := make(chan struct{}) - go func() { - m.wg.Wait() - close(doneCh) - }() - - select { - case <-ctx.Done(): - return fmt.Errorf("failed to stop tenant %s queueManager: %w", m.tenantID, ctx.Err()) - case <-doneCh: - return nil - } -} - -// shouldUpdate returns true if the queue size or worker count (alive or total) has changed -func (m *queueManager) shouldUpdate(queueSize, numWorkers int) bool { - // TODO: worker alive count could be 0 and shutting down the queue manager would be impossible - // it'd be better if we were able to spawn new workers instead of just closing the queueManager - // and creating a new one - return m.queueSize != queueSize || m.workerCount != numWorkers || m.workerAliveCount.Load() != int32(numWorkers) -} diff --git a/modules/distributor/forwarder_test.go b/modules/distributor/forwarder_test.go index 016aba0492e..9fd324a88f1 100644 --- a/modules/distributor/forwarder_test.go +++ b/modules/distributor/forwarder_test.go @@ -3,21 +3,19 @@ package distributor import ( "context" "flag" - "os" - "path/filepath" "sync" "testing" "time" - "github.com/grafana/dskit/services" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/grafana/tempo/modules/overrides" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" ) const tenantID = "tenant-id" @@ -37,13 +35,18 @@ func TestForwarder(t *testing.T) { require.NoError(t, err) wg := sync.WaitGroup{} - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - assert.Equal(t, tenantID, userID) - assert.Equal(t, keys, k) - assert.Equal(t, rebatchedTraces, traces) - wg.Done() - return nil - }, o) + f := newGeneratorForwarder( + log.NewNopLogger(), + prometheus.NewPedanticRegistry(), + func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { + assert.Equal(t, tenantID, userID) + assert.Equal(t, keys, k) + assert.Equal(t, rebatchedTraces, traces) + wg.Done() + return nil + }, + o, + ) require.NoError(t, f.start(context.Background())) defer func() { require.NoError(t, f.stop(nil)) @@ -56,52 +59,6 @@ func TestForwarder(t *testing.T) { wg.Add(1) f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) wg.Wait() - - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) -} - -func TestForwarder_QueueSizeConfig(t *testing.T) { - // this test sends more traces than the configured queue size - // and asserts that the queue size is equal or less than the configured size - - queueSize := 10 - oCfg := overrides.Limits{ - MetricsGeneratorForwarderQueueSize: queueSize, - MetricsGeneratorForwarderWorkers: 1, - } - oCfg.RegisterFlags(&flag.FlagSet{}) - - id, err := util.HexStringToTraceID("1234567890abcdef") - require.NoError(t, err) - - b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) - require.NoError(t, err) - - o, err := overrides.NewOverrides(oCfg) - require.NoError(t, err) - - shutdownCh := make(chan struct{}) - - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - <-shutdownCh - return nil - }, o) - - require.NoError(t, f.start(context.Background())) - defer func() { - close(shutdownCh) - require.NoError(t, f.stop(nil)) - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) - }() - - // sending more traces than queue size - for i := 0; i < queueSize+2; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - - // queue length is less or equal to the configured queue size - assert.LessOrEqual(t, len(f.queueManagers[tenantID].reqChan), queueSize) } func TestForwarder_shutdown(t *testing.T) { @@ -120,14 +77,19 @@ func TestForwarder_shutdown(t *testing.T) { require.NoError(t, err) signalCh := make(chan struct{}) - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - <-signalCh - - assert.Equal(t, tenantID, userID) - assert.Equal(t, keys, k) - assert.Equal(t, rebatchedTraces, traces) - return nil - }, o) + f := newGeneratorForwarder( + log.NewNopLogger(), + prometheus.NewPedanticRegistry(), + func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { + <-signalCh + + assert.Equal(t, tenantID, userID) + assert.Equal(t, keys, k) + assert.Equal(t, rebatchedTraces, traces) + return nil + }, + o, + ) require.NoError(t, f.start(context.Background())) defer func() { @@ -137,117 +99,9 @@ func TestForwarder_shutdown(t *testing.T) { close(signalCh) }() require.NoError(t, f.stop(nil)) - f.mutex.Lock() - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() }() for i := 0; i < 100; i++ { f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) } } - -func TestForwarder_overrides(t *testing.T) { - overridesReloadInterval := 100 * time.Millisecond - limits := overrides.Limits{} - overridesFile := filepath.Join(t.TempDir(), "overrides.yaml") - - buff, err := yaml.Marshal(map[string]map[string]*overrides.Limits{ - "overrides": { - tenantID: { - MetricsGeneratorForwarderQueueSize: 10, - MetricsGeneratorForwarderWorkers: 1, - }, - }, - }) - require.NoError(t, err) - - err = os.WriteFile(overridesFile, buff, os.ModePerm) - require.NoError(t, err) - - limits.PerTenantOverrideConfig = overridesFile - limits.PerTenantOverridePeriod = model.Duration(overridesReloadInterval) - - id, err := util.HexStringToTraceID("1234567890abcdef") - require.NoError(t, err) - - b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) - require.NoError(t, err) - - o, err := overrides.NewOverrides(limits) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.TODO(), o)) - - signalCh := make(chan struct{}) - wg := sync.WaitGroup{} - f := newForwarder(func(ctx context.Context, userID string, k []uint32, traces []*rebatchedTrace) error { - wg.Done() - <-signalCh - return nil - }, o) - f.overridesInterval = overridesReloadInterval - - require.NoError(t, f.start(context.Background())) - defer func() { - close(signalCh) - require.NoError(t, f.stop(nil)) - f.mutex.Lock() - assert.Equal(t, 0, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() - }() - - wg.Add(1) - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - wg.Wait() - - // 10 pushes are buffered, 10 are discarded - for i := 0; i < 20; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - - // queue is full with 10 items - f.mutex.Lock() - assert.Equal(t, 10, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() - wg.Add(10) - - buff, err = yaml.Marshal(map[string]map[string]*overrides.Limits{ - "overrides": { - tenantID: { - MetricsGeneratorForwarderQueueSize: 20, - MetricsGeneratorForwarderWorkers: 2, - }, - }, - }) - require.NoError(t, err) - - err = os.WriteFile(overridesFile, buff, os.ModePerm) - require.NoError(t, err) - - // Wait triple the reload interval to ensure overrides are updated (overrides interval + forwarder interval) - time.Sleep(3 * overridesReloadInterval) - - // Allow for pending requests to be processed so queueManager can be closed and a new one created - for i := 0; i < 11; i++ { - signalCh <- struct{}{} - } - wg.Wait() - - wg.Add(2) - for i := 0; i < 2; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - wg.Wait() - - // 20 pushes are buffered, 10 are discarded - for i := 0; i < 30; i++ { - f.SendTraces(context.Background(), tenantID, keys, rebatchedTraces) - } - - // queue is full with 20 items - f.mutex.Lock() - assert.Equal(t, 20, len(f.queueManagers[tenantID].reqChan)) - f.mutex.Unlock() - wg.Add(20) -} diff --git a/modules/distributor/queue/config.go b/modules/distributor/queue/config.go new file mode 100644 index 00000000000..a21a5384617 --- /dev/null +++ b/modules/distributor/queue/config.go @@ -0,0 +1,8 @@ +package queue + +type Config struct { + Name string + TenantID string + Size int + WorkerCount int +} diff --git a/modules/distributor/queue/queue.go b/modules/distributor/queue/queue.go new file mode 100644 index 00000000000..d681dded616 --- /dev/null +++ b/modules/distributor/queue/queue.go @@ -0,0 +1,224 @@ +package queue + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/user" + "go.uber.org/atomic" +) + +type request[T any] struct { + data T +} + +type ProcessFunc[T any] func(ctx context.Context, data T) error + +// Queue represents a single tenant's queue. +type Queue[T any] struct { + // wg is used to wait for the workers to drain the queue while stopping + wg sync.WaitGroup + + logger log.Logger + + name string + tenantID string + workerCount int + size int + reqChan chan *request[T] + fn ProcessFunc[T] + workersCloseCh chan struct{} + + pushesTotalMetrics *prometheus.CounterVec + pushesFailuresTotalMetrics *prometheus.CounterVec + lengthMetric *prometheus.GaugeVec + + readOnly *atomic.Bool +} + +func New[T any](cfg Config, logger log.Logger, reg prometheus.Registerer, fn ProcessFunc[T]) *Queue[T] { + pushesTotalMetrics := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_pushes_total", + Help: "Total number of successful requests queued up for a tenant to the generatorForwarder", + }, []string{"name", "tenant"}) + pushesFailuresTotalMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_pushes_failures_total", + Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder", + }, []string{"name", "tenant"}) + lengthMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_length", + Help: "Number of queued requests for a tenant", + }, []string{"name", "tenant"}) + + var alreadyRegisteredErr prometheus.AlreadyRegisteredError + err := reg.Register(pushesTotalMetrics) + if err != nil && !errors.As(err, &alreadyRegisteredErr) { + _ = level.Warn(logger).Log("msg", "failed to register queue_pushes_total metric", "err", err) + } + err = reg.Register(pushesFailuresTotalMetric) + if err != nil && !errors.As(err, &alreadyRegisteredErr) { + _ = level.Warn(logger).Log("msg", "failed to register queue_pushes_failures_total metric", "err", err) + } + + err = reg.Register(lengthMetric) + if err != nil && !errors.As(err, &alreadyRegisteredErr) { + _ = level.Warn(logger).Log("msg", "failed to register queue_length metric", "err", err) + } + + return &Queue[T]{ + logger: logger, + name: cfg.Name, + tenantID: cfg.TenantID, + workerCount: cfg.WorkerCount, + size: cfg.Size, + reqChan: make(chan *request[T], cfg.Size), + fn: fn, + workersCloseCh: make(chan struct{}), + pushesTotalMetrics: pushesTotalMetrics, + pushesFailuresTotalMetrics: pushesFailuresTotalMetric, + lengthMetric: lengthMetric, + readOnly: atomic.NewBool(false), + } +} + +// Push pushes data onto a queue. +// If the queue is full, the data is dropped +func (m *Queue[T]) Push(ctx context.Context, data T) error { + if m.readOnly.Load() { + return fmt.Errorf("queue is read-only") + } + + m.pushesTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() + + req := &request[T]{ + data: data, + } + + select { + case <-ctx.Done(): + m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() + return fmt.Errorf("failed to push data to queue for tenant=%s and queue_name=%s: %w", m.tenantID, m.name, ctx.Err()) + default: + } + + select { + case m.reqChan <- req: + m.lengthMetric.WithLabelValues(m.name, m.tenantID).Inc() + return nil + default: + } + + // Fail fast if the queue is full + m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() + + return fmt.Errorf("failed to push data to queue for tenant=%s and queue_name=%s: queue is full", m.tenantID, m.name) +} + +func (m *Queue[T]) StartWorkers() { + for i := 0; i < m.workerCount; i++ { + m.wg.Add(1) + + go m.worker() + } +} + +// Name returns queue name. +func (m *Queue[T]) Name() string { + return m.name +} + +// TenantID returns the tenant id. +func (m *Queue[T]) TenantID() string { + return m.tenantID +} + +// Size returns the size of the queue. +func (m *Queue[T]) Size() int { + return m.size +} + +// WorkerCount returns the number of expected workers. +func (m *Queue[T]) WorkerCount() int { + return m.workerCount +} + +// ShouldUpdate returns true if the queue size or worker count (alive or total) has changed +func (m *Queue[T]) ShouldUpdate(size, workerCount int) bool { + return m.size != size || m.workerCount != workerCount +} + +func (m *Queue[T]) Shutdown(ctx context.Context) error { + // Call to stopWorkers only once + if m.readOnly.CAS(false, true) { + return m.stopWorkers(ctx) + } + + return nil +} + +func (m *Queue[T]) worker() { + defer m.wg.Done() + + for { + select { + case req := <-m.reqChan: + m.lengthMetric.WithLabelValues(m.name, m.tenantID).Dec() + m.forwardRequest(req) + case <-m.workersCloseCh: + // Forces to always trying to pull from the queue before exiting + // This is important during shutdown to ensure that the queue is drained + select { + case req, ok := <-m.reqChan: + if !ok { //closed + return + } + + m.lengthMetric.WithLabelValues(m.name, m.tenantID).Dec() + m.forwardRequest(req) + default: + return + } + } + } +} + +func (m *Queue[T]) forwardRequest(req *request[T]) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + ctx = user.InjectOrgID(ctx, m.tenantID) + + if err := m.fn(ctx, req.data); err != nil { + _ = level.Error(m.logger).Log("msg", "pushing with forwarder failed", "err", err) + } +} + +func (m *Queue[T]) stopWorkers(ctx context.Context) error { + // Close workersCloseCh and wait for all workers to return + close(m.workersCloseCh) + + doneCh := make(chan struct{}) + go func() { + m.wg.Wait() + close(doneCh) + }() + + select { + case <-ctx.Done(): + return fmt.Errorf("failed to stop tenant %s Queue: %w", m.tenantID, ctx.Err()) + case <-doneCh: + return nil + } +} diff --git a/modules/distributor/queue/queue_test.go b/modules/distributor/queue/queue_test.go new file mode 100644 index 00000000000..f43998a1c0a --- /dev/null +++ b/modules/distributor/queue/queue_test.go @@ -0,0 +1,283 @@ +package queue + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func newQueue[T any](t *testing.T, size, workerCount int, processFunc ProcessFunc[T]) *Queue[T] { + cfg := Config{Name: "testName", TenantID: "testTenantID", Size: size, WorkerCount: workerCount} + + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + q := New(cfg, logger, reg, processFunc) + + t.Cleanup(func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + require.NoError(t, q.Shutdown(ctx)) + }) + + return q +} + +func newStartedQueue[T any](t *testing.T, size, workerCount int, processFunc ProcessFunc[T]) *Queue[T] { + q := newQueue(t, size, workerCount, processFunc) + q.StartWorkers() + + return q +} + +func getCounterValue(metric *prometheus.CounterVec) float64 { + var m = &dto.Metric{} + if err := metric.WithLabelValues("testName", "testTenantID").Write(m); err != nil { + return 0 + } + + return m.Counter.GetValue() +} + +func TestNew_ReturnsNotNilAndSetsCorrectFieldsFromConfig(t *testing.T) { + // Given + cfg := Config{Name: "testName", TenantID: "testTenantID", Size: 123, WorkerCount: 321} + processFunc := func(context.Context, int) error { return nil } + logger := log.NewNopLogger() + reg := prometheus.NewPedanticRegistry() + + // When + got := New(cfg, logger, reg, processFunc) + + // Then + require.NotNil(t, got) + require.Equal(t, got.name, cfg.Name) + require.Equal(t, got.tenantID, cfg.TenantID) + require.Equal(t, got.size, cfg.Size) + require.Equal(t, got.workerCount, cfg.WorkerCount) +} + +func TestQueue_Push_ReturnsNoErrorAndWorkersInvokeProcessFuncCorrectNumberOfTimesWithRunningWorkers(t *testing.T) { + // Given + count := atomic.NewUint32(0) + wg := sync.WaitGroup{} + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) error { + defer wg.Done() + count.Inc() + return nil + } + q := newStartedQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size-3; i++ { + wg.Add(1) + require.NoError(t, q.Push(context.Background(), nil)) + } + + // Then + wg.Wait() + require.Equal(t, uint32(size-3), count.Load()) + require.Equal(t, float64(size-3), getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsNoErrorWhenPushingLessItemsThanSizeWithStoppedWorkers(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) error { return nil } + q := newQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size-3; i++ { + require.NoError(t, q.Push(context.Background(), nil)) + } + + // Then + require.Equal(t, size-3, len(q.reqChan)) + require.Equal(t, float64(size-3), getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsErrorWhenPushingItemsToShutdownQueue(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) error { return nil } + q := newStartedQueue(t, size, workerCount, processFunc) + require.NoError(t, q.Shutdown(context.Background())) + + // When + err := q.Push(context.Background(), nil) + + // Then + require.Error(t, err) + require.Zero(t, len(q.reqChan)) + require.Zero(t, getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_QueueGetsProperlyDrainedOnShutdown(t *testing.T) { + // Given + count := atomic.NewUint32(0) + wg := sync.WaitGroup{} + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) error { + defer wg.Done() + count.Inc() + return nil + } + q := newQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size-3; i++ { + wg.Add(1) + require.NoError(t, q.Push(context.Background(), nil)) + } + + require.NoError(t, q.Shutdown(context.Background())) + q.StartWorkers() + + // Then + wg.Wait() + require.Zero(t, len(q.reqChan)) + require.Equal(t, float64(size-3), getCounterValue(q.pushesTotalMetrics)) + require.Zero(t, getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsErrorWhenPushingItemsWithCancelledContext(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) error { return nil } + q := newStartedQueue(t, size, workerCount, processFunc) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + // When + err := q.Push(ctx, nil) + + // Then + require.Error(t, err) + require.Zero(t, len(q.reqChan)) + require.Equal(t, float64(1), getCounterValue(q.pushesTotalMetrics)) + require.Equal(t, float64(1), getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Push_ReturnsErrorWhenPushingItemsToFullQueueWithStoppedWorkers(t *testing.T) { + // Given + size := 10 + workerCount := 3 + processFunc := func(context.Context, any) error { return nil } + q := newQueue(t, size, workerCount, processFunc) + + // When + for i := 0; i < size; i++ { + require.NoError(t, q.Push(context.Background(), nil)) + } + + require.Error(t, q.Push(context.Background(), nil)) + + // Then + require.Equal(t, size, len(q.reqChan)) + require.Equal(t, float64(size+1), getCounterValue(q.pushesTotalMetrics)) + require.Equal(t, float64(1), getCounterValue(q.pushesFailuresTotalMetrics)) +} + +func TestQueue_Name_ReturnsCorrectValue(t *testing.T) { + // Given + q := newQueue[int](t, 1, 1, nil) + + // When + got := q.Name() + + // Then + require.Equal(t, "testName", got) +} + +func TestQueue_TenantID_ReturnsCorrectValue(t *testing.T) { + // Given + q := newQueue[int](t, 1, 1, nil) + + // When + got := q.TenantID() + + // Then + require.Equal(t, "testTenantID", got) +} + +func TestQueue_Size_ReturnsCorrectValue(t *testing.T) { + // Given + q := newQueue[int](t, 1337, 1, nil) + + // When + got := q.Size() + + // Then + require.Equal(t, 1337, got) +} + +func TestQueue_WorkerCount_ReturnsCorrectValue(t *testing.T) { + // Given + q := newQueue[int](t, 1, 1337, nil) + + // When + got := q.WorkerCount() + + // Then + require.Equal(t, 1337, got) +} + +func TestQueue_ShouldUpdate_ReturnsTrueWhenWorkerCountDiffersFromOriginalValue(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(2, 7) + + // Then + require.True(t, got) +} + +func TestQueue_ShouldUpdate_ReturnsTrueWhenSizeDiffersFromOriginalValue(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(7, 3) + + // Then + require.True(t, got) +} + +func TestQueue_ShouldUpdate_ReturnsTrueWhenBothParametersDifferFromOriginalValue(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(13, 17) + + // Then + require.True(t, got) +} + +func TestQueue_ShouldUpdate_ReturnsFalseWhenBothParametersEqualOriginalValues(t *testing.T) { + // Given + q := newQueue[int](t, 2, 3, nil) + + // When + got := q.ShouldUpdate(2, 3) + + // Then + require.False(t, got) +} From 730dcd8a89e59926943e1697e8242cf53df087e1 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Wed, 12 Oct 2022 11:08:02 +0200 Subject: [PATCH 02/12] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6859fe15276..01002903547 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## main / unreleased +* [ENHANCEMENT] Refactor queueManager into generic queue.Queue [#1796](https://github.com/grafana/tempo/pull/1796) (@Blinkuu) * [ENHANCEMENT] Filter namespace by cluster in tempo dashboards variables [#1771](https://github.com/grafana/tempo/pull/1771) (@electron0zero) * [ENHANCEMENT] Exit early from sharded search requests [#1742](https://github.com/grafana/tempo/pull/1742) (@electron0zero) * [CHANGE] Identify bloom that could not be retrieved from backend block [#1737](https://github.com/grafana/tempo/pull/1737) (@AlexDHoffer) From 95b9175de9223d89723da70f9570ca40a0f706bd Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:20:23 +0200 Subject: [PATCH 03/12] Create shared processFun method and a helper function for queue creation --- modules/distributor/forwarder.go | 61 ++++++++++++-------------------- 1 file changed, 23 insertions(+), 38 deletions(-) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index 82d7573ffa1..6a056e89d5e 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -119,25 +119,7 @@ func (f *generatorForwarder) getOrCreateQueue(tenantID string) *queue.Queue[*req defer f.mutex.Unlock() queueSize, workerCount := f.getQueueConfig(tenantID) - - processFunc := func(ctx context.Context, data *request) error { - return f.forwardFunc(ctx, data.tenantID, data.keys, data.traces) - } - - newQueue := queue.New( - queue.Config{ - Name: "metrics-generator", - TenantID: tenantID, - Size: queueSize, - WorkerCount: workerCount, - }, - f.logger, - f.reg, - processFunc, - ) - newQueue.StartWorkers() - - f.queues[tenantID] = newQueue + f.queues[tenantID] = f.createQueueAndStartWorkers(tenantID, queueSize, workerCount) return f.queues[tenantID] } @@ -203,25 +185,7 @@ func (f *generatorForwarder) watchOverrides() { // Synchronously update queue managers for _, q := range queuesToAdd { _ = level.Info(f.logger).Log("msg", "Updating queue manager", "tenant", q.tenantID) - - processFunc := func(ctx context.Context, data *request) error { - return f.forwardFunc(ctx, data.tenantID, data.keys, data.traces) - } - - newQueue := queue.New( - queue.Config{ - Name: "metrics-generator", - TenantID: q.tenantID, - Size: q.queueSize, - WorkerCount: q.workerCount, - }, - f.logger, - f.reg, - processFunc, - ) - newQueue.StartWorkers() - - f.queues[q.tenantID] = newQueue + f.queues[q.tenantID] = f.createQueueAndStartWorkers(q.tenantID, q.queueSize, q.workerCount) } f.mutex.Unlock() @@ -253,3 +217,24 @@ func (f *generatorForwarder) stop(_ error) error { } return multierr.Combine(errs...) } + +func (f *generatorForwarder) processFunc(ctx context.Context, data *request) error { + return f.forwardFunc(ctx, data.tenantID, data.keys, data.traces) +} + +func (f *generatorForwarder) createQueueAndStartWorkers(tenantID string, size, workerCount int) *queue.Queue[*request] { + q := queue.New( + queue.Config{ + Name: "metrics-generator", + TenantID: tenantID, + Size: size, + WorkerCount: workerCount, + }, + f.logger, + f.reg, + f.processFunc, + ) + q.StartWorkers() + + return q +} From eb2ae5856ac701f1075fc2e0ef19837607e63a49 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:30:53 +0200 Subject: [PATCH 04/12] Remove returning error from ProcessFunc and update chan *request[T] to chan T. --- modules/distributor/queue/queue.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/modules/distributor/queue/queue.go b/modules/distributor/queue/queue.go index d681dded616..cbfe96aa798 100644 --- a/modules/distributor/queue/queue.go +++ b/modules/distributor/queue/queue.go @@ -14,11 +14,7 @@ import ( "go.uber.org/atomic" ) -type request[T any] struct { - data T -} - -type ProcessFunc[T any] func(ctx context.Context, data T) error +type ProcessFunc[T any] func(ctx context.Context, data T) // Queue represents a single tenant's queue. type Queue[T any] struct { @@ -31,7 +27,7 @@ type Queue[T any] struct { tenantID string workerCount int size int - reqChan chan *request[T] + reqChan chan T fn ProcessFunc[T] workersCloseCh chan struct{} @@ -83,7 +79,7 @@ func New[T any](cfg Config, logger log.Logger, reg prometheus.Registerer, fn Pro tenantID: cfg.TenantID, workerCount: cfg.WorkerCount, size: cfg.Size, - reqChan: make(chan *request[T], cfg.Size), + reqChan: make(chan T, cfg.Size), fn: fn, workersCloseCh: make(chan struct{}), pushesTotalMetrics: pushesTotalMetrics, @@ -102,10 +98,6 @@ func (m *Queue[T]) Push(ctx context.Context, data T) error { m.pushesTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() - req := &request[T]{ - data: data, - } - select { case <-ctx.Done(): m.pushesFailuresTotalMetrics.WithLabelValues(m.name, m.tenantID).Inc() @@ -114,7 +106,7 @@ func (m *Queue[T]) Push(ctx context.Context, data T) error { } select { - case m.reqChan <- req: + case m.reqChan <- data: m.lengthMetric.WithLabelValues(m.name, m.tenantID).Inc() return nil default: @@ -194,15 +186,13 @@ func (m *Queue[T]) worker() { } } -func (m *Queue[T]) forwardRequest(req *request[T]) { +func (m *Queue[T]) forwardRequest(req T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() ctx = user.InjectOrgID(ctx, m.tenantID) - if err := m.fn(ctx, req.data); err != nil { - _ = level.Error(m.logger).Log("msg", "pushing with forwarder failed", "err", err) - } + m.fn(ctx, req) } func (m *Queue[T]) stopWorkers(ctx context.Context) error { From 2513ff7f78ffcedad36a63e2d083d9af9563f76c Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:42:45 +0200 Subject: [PATCH 05/12] Fix unit tests --- modules/distributor/queue/queue_test.go | 60 +++---------------------- 1 file changed, 7 insertions(+), 53 deletions(-) diff --git a/modules/distributor/queue/queue_test.go b/modules/distributor/queue/queue_test.go index f43998a1c0a..5b6863693c3 100644 --- a/modules/distributor/queue/queue_test.go +++ b/modules/distributor/queue/queue_test.go @@ -49,7 +49,7 @@ func getCounterValue(metric *prometheus.CounterVec) float64 { func TestNew_ReturnsNotNilAndSetsCorrectFieldsFromConfig(t *testing.T) { // Given cfg := Config{Name: "testName", TenantID: "testTenantID", Size: 123, WorkerCount: 321} - processFunc := func(context.Context, int) error { return nil } + processFunc := func(context.Context, int) {} logger := log.NewNopLogger() reg := prometheus.NewPedanticRegistry() @@ -70,10 +70,9 @@ func TestQueue_Push_ReturnsNoErrorAndWorkersInvokeProcessFuncCorrectNumberOfTime wg := sync.WaitGroup{} size := 10 workerCount := 3 - processFunc := func(context.Context, any) error { + processFunc := func(context.Context, any) { defer wg.Done() count.Inc() - return nil } q := newStartedQueue(t, size, workerCount, processFunc) @@ -94,7 +93,7 @@ func TestQueue_Push_ReturnsNoErrorWhenPushingLessItemsThanSizeWithStoppedWorkers // Given size := 10 workerCount := 3 - processFunc := func(context.Context, any) error { return nil } + processFunc := func(context.Context, any) {} q := newQueue(t, size, workerCount, processFunc) // When @@ -112,7 +111,7 @@ func TestQueue_Push_ReturnsErrorWhenPushingItemsToShutdownQueue(t *testing.T) { // Given size := 10 workerCount := 3 - processFunc := func(context.Context, any) error { return nil } + processFunc := func(context.Context, any) {} q := newStartedQueue(t, size, workerCount, processFunc) require.NoError(t, q.Shutdown(context.Background())) @@ -132,10 +131,9 @@ func TestQueue_Push_QueueGetsProperlyDrainedOnShutdown(t *testing.T) { wg := sync.WaitGroup{} size := 10 workerCount := 3 - processFunc := func(context.Context, any) error { + processFunc := func(context.Context, any) { defer wg.Done() count.Inc() - return nil } q := newQueue(t, size, workerCount, processFunc) @@ -159,7 +157,7 @@ func TestQueue_Push_ReturnsErrorWhenPushingItemsWithCancelledContext(t *testing. // Given size := 10 workerCount := 3 - processFunc := func(context.Context, any) error { return nil } + processFunc := func(context.Context, any) {} q := newStartedQueue(t, size, workerCount, processFunc) ctx, cancel := context.WithCancel(context.Background()) cancel() @@ -178,7 +176,7 @@ func TestQueue_Push_ReturnsErrorWhenPushingItemsToFullQueueWithStoppedWorkers(t // Given size := 10 workerCount := 3 - processFunc := func(context.Context, any) error { return nil } + processFunc := func(context.Context, any) {} q := newQueue(t, size, workerCount, processFunc) // When @@ -194,50 +192,6 @@ func TestQueue_Push_ReturnsErrorWhenPushingItemsToFullQueueWithStoppedWorkers(t require.Equal(t, float64(1), getCounterValue(q.pushesFailuresTotalMetrics)) } -func TestQueue_Name_ReturnsCorrectValue(t *testing.T) { - // Given - q := newQueue[int](t, 1, 1, nil) - - // When - got := q.Name() - - // Then - require.Equal(t, "testName", got) -} - -func TestQueue_TenantID_ReturnsCorrectValue(t *testing.T) { - // Given - q := newQueue[int](t, 1, 1, nil) - - // When - got := q.TenantID() - - // Then - require.Equal(t, "testTenantID", got) -} - -func TestQueue_Size_ReturnsCorrectValue(t *testing.T) { - // Given - q := newQueue[int](t, 1337, 1, nil) - - // When - got := q.Size() - - // Then - require.Equal(t, 1337, got) -} - -func TestQueue_WorkerCount_ReturnsCorrectValue(t *testing.T) { - // Given - q := newQueue[int](t, 1, 1337, nil) - - // When - got := q.WorkerCount() - - // Then - require.Equal(t, 1337, got) -} - func TestQueue_ShouldUpdate_ReturnsTrueWhenWorkerCountDiffersFromOriginalValue(t *testing.T) { // Given q := newQueue[int](t, 2, 3, nil) From a02915609df6a389a24507f2ac70b9cc1f4f978f Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:44:47 +0200 Subject: [PATCH 06/12] Replace go.uber.org/atomic with sync/atomic --- modules/distributor/queue/queue.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/distributor/queue/queue.go b/modules/distributor/queue/queue.go index cbfe96aa798..2a63fa680b8 100644 --- a/modules/distributor/queue/queue.go +++ b/modules/distributor/queue/queue.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/go-kit/log" @@ -11,7 +12,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/user" - "go.uber.org/atomic" ) type ProcessFunc[T any] func(ctx context.Context, data T) @@ -85,7 +85,7 @@ func New[T any](cfg Config, logger log.Logger, reg prometheus.Registerer, fn Pro pushesTotalMetrics: pushesTotalMetrics, pushesFailuresTotalMetrics: pushesFailuresTotalMetric, lengthMetric: lengthMetric, - readOnly: atomic.NewBool(false), + readOnly: &atomic.Bool{}, } } @@ -153,7 +153,7 @@ func (m *Queue[T]) ShouldUpdate(size, workerCount int) bool { func (m *Queue[T]) Shutdown(ctx context.Context) error { // Call to stopWorkers only once - if m.readOnly.CAS(false, true) { + if m.readOnly.CompareAndSwap(false, true) { return m.stopWorkers(ctx) } From ba22c7ec7bdd92025c2f60e00b14d6074b3d653f Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:51:14 +0200 Subject: [PATCH 07/12] Remove redundant accessor --- modules/distributor/queue/queue.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/modules/distributor/queue/queue.go b/modules/distributor/queue/queue.go index 2a63fa680b8..5c64717469c 100644 --- a/modules/distributor/queue/queue.go +++ b/modules/distributor/queue/queue.go @@ -126,11 +126,6 @@ func (m *Queue[T]) StartWorkers() { } } -// Name returns queue name. -func (m *Queue[T]) Name() string { - return m.name -} - // TenantID returns the tenant id. func (m *Queue[T]) TenantID() string { return m.tenantID From f3d2b36e0b6934f3535811e8740c11cd8faa665f Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:51:28 +0200 Subject: [PATCH 08/12] Fix processFunc definition --- modules/distributor/forwarder.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index 6a056e89d5e..7788eb0a54b 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -218,8 +218,10 @@ func (f *generatorForwarder) stop(_ error) error { return multierr.Combine(errs...) } -func (f *generatorForwarder) processFunc(ctx context.Context, data *request) error { - return f.forwardFunc(ctx, data.tenantID, data.keys, data.traces) +func (f *generatorForwarder) processFunc(ctx context.Context, data *request) { + if err := f.forwardFunc(ctx, data.tenantID, data.keys, data.traces); err != nil { + _ = level.Warn(f.logger).Log("msg", "failed to forward request to metrics generator", "err", err) + } } func (f *generatorForwarder) createQueueAndStartWorkers(tenantID string, size, workerCount int) *queue.Queue[*request] { From 8c9ad24ae957f5c83b85b3046bbe2364cdc82556 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 11:56:52 +0200 Subject: [PATCH 09/12] Add breaking change to CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01002903547..2e0db9ce1c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## main / unreleased * [ENHANCEMENT] Refactor queueManager into generic queue.Queue [#1796](https://github.com/grafana/tempo/pull/1796) (@Blinkuu) + - **BREAKING CHANGE** Rename `tempo_distributor_forwarder_queue_length` metric to `tempo_distributor_queue_length`. New metric has two custom labels: `name` and `tenant`. * [ENHANCEMENT] Filter namespace by cluster in tempo dashboards variables [#1771](https://github.com/grafana/tempo/pull/1771) (@electron0zero) * [ENHANCEMENT] Exit early from sharded search requests [#1742](https://github.com/grafana/tempo/pull/1742) (@electron0zero) * [CHANGE] Identify bloom that could not be retrieved from backend block [#1737](https://github.com/grafana/tempo/pull/1737) (@AlexDHoffer) From 291de28df8d3db36b8c7733d047df63fd92b08c6 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 12:00:34 +0200 Subject: [PATCH 10/12] Define metrics on the package level to overcome Prometheus DefaultRegistry limitations --- modules/distributor/queue/queue.go | 60 ++++++++++++------------------ 1 file changed, 23 insertions(+), 37 deletions(-) diff --git a/modules/distributor/queue/queue.go b/modules/distributor/queue/queue.go index 5c64717469c..ff5b48cced9 100644 --- a/modules/distributor/queue/queue.go +++ b/modules/distributor/queue/queue.go @@ -8,12 +8,32 @@ import ( "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" ) +var ( + pushesTotalMetrics = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_pushes_total", + Help: "Total number of successful requests queued up for a tenant to the generatorForwarder", + }, []string{"name", "tenant"}) + pushesFailuresTotalMetric = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_pushes_failures_total", + Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder", + }, []string{"name", "tenant"}) + lengthMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "tempo", + Subsystem: "distributor", + Name: "queue_length", + Help: "Number of queued requests for a tenant", + }, []string{"name", "tenant"}) +) + type ProcessFunc[T any] func(ctx context.Context, data T) // Queue represents a single tenant's queue. @@ -38,41 +58,7 @@ type Queue[T any] struct { readOnly *atomic.Bool } -func New[T any](cfg Config, logger log.Logger, reg prometheus.Registerer, fn ProcessFunc[T]) *Queue[T] { - pushesTotalMetrics := prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "tempo", - Subsystem: "distributor", - Name: "queue_pushes_total", - Help: "Total number of successful requests queued up for a tenant to the generatorForwarder", - }, []string{"name", "tenant"}) - pushesFailuresTotalMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "tempo", - Subsystem: "distributor", - Name: "queue_pushes_failures_total", - Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder", - }, []string{"name", "tenant"}) - lengthMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "tempo", - Subsystem: "distributor", - Name: "queue_length", - Help: "Number of queued requests for a tenant", - }, []string{"name", "tenant"}) - - var alreadyRegisteredErr prometheus.AlreadyRegisteredError - err := reg.Register(pushesTotalMetrics) - if err != nil && !errors.As(err, &alreadyRegisteredErr) { - _ = level.Warn(logger).Log("msg", "failed to register queue_pushes_total metric", "err", err) - } - err = reg.Register(pushesFailuresTotalMetric) - if err != nil && !errors.As(err, &alreadyRegisteredErr) { - _ = level.Warn(logger).Log("msg", "failed to register queue_pushes_failures_total metric", "err", err) - } - - err = reg.Register(lengthMetric) - if err != nil && !errors.As(err, &alreadyRegisteredErr) { - _ = level.Warn(logger).Log("msg", "failed to register queue_length metric", "err", err) - } - +func New[T any](cfg Config, logger log.Logger, _ prometheus.Registerer, fn ProcessFunc[T]) *Queue[T] { return &Queue[T]{ logger: logger, name: cfg.Name, From be45e887fad2d6f483baee5b2bac970f28b39481 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 13:05:44 +0200 Subject: [PATCH 11/12] Fix tests in queue_test.go --- modules/distributor/queue/queue_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/distributor/queue/queue_test.go b/modules/distributor/queue/queue_test.go index 5b6863693c3..75558a2e06a 100644 --- a/modules/distributor/queue/queue_test.go +++ b/modules/distributor/queue/queue_test.go @@ -25,6 +25,11 @@ func newQueue[T any](t *testing.T, size, workerCount int, processFunc ProcessFun defer cancel() require.NoError(t, q.Shutdown(ctx)) + + // Metrics are defined on package-level, we need to reset them each time. + pushesTotalMetrics.Reset() + pushesFailuresTotalMetric.Reset() + lengthMetric.Reset() }) return q From f995cace7f4a12e4b34f81f25e17dec60d456508 Mon Sep 17 00:00:00 2001 From: Lukasz Gut Date: Thu, 13 Oct 2022 16:38:04 +0200 Subject: [PATCH 12/12] Deprecate generatorForwarder metrics in favor of queue.Queue metrics --- CHANGELOG.md | 2 ++ modules/distributor/forwarder.go | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e0db9ce1c5..fd3f2508508 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ * [ENHANCEMENT] Refactor queueManager into generic queue.Queue [#1796](https://github.com/grafana/tempo/pull/1796) (@Blinkuu) - **BREAKING CHANGE** Rename `tempo_distributor_forwarder_queue_length` metric to `tempo_distributor_queue_length`. New metric has two custom labels: `name` and `tenant`. + - Deprecated `tempo_distributor_forwarder_pushes_total` metric in favor of `tempo_distributor_queue_pushes_total`. + - Deprecated `tempo_distributor_forwarder_pushes_failures_total` metric in favor of `tempo_distributor_queue_pushes_failures_total`. * [ENHANCEMENT] Filter namespace by cluster in tempo dashboards variables [#1771](https://github.com/grafana/tempo/pull/1771) (@electron0zero) * [ENHANCEMENT] Exit early from sharded search requests [#1742](https://github.com/grafana/tempo/pull/1742) (@electron0zero) * [CHANGE] Identify bloom that could not be retrieved from backend block [#1737](https://github.com/grafana/tempo/pull/1737) (@AlexDHoffer) diff --git a/modules/distributor/forwarder.go b/modules/distributor/forwarder.go index 7788eb0a54b..8c178691940 100644 --- a/modules/distributor/forwarder.go +++ b/modules/distributor/forwarder.go @@ -25,12 +25,12 @@ var ( metricForwarderPushes = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_forwarder_pushes_total", - Help: "Total number of successful requests queued up for a tenant to the generatorForwarder", + Help: "Total number of successful requests queued up for a tenant to the generatorForwarder. This metric is now deprecated in favor of tempo_distributor_queue_pushes_total.", }, []string{"tenant"}) metricForwarderPushesFailures = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_forwarder_pushes_failures_total", - Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder", + Help: "Total number of failed pushes to the queue for a tenant to the generatorForwarder. This metric is now deprecated in favor of tempo_distributor_queue_pushes_failures_total.", }, []string{"tenant"}) )