From 3439704618c0f8edc2ac06f5ef4c8894a192c6a1 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Sun, 7 Jul 2024 17:01:56 +0200 Subject: [PATCH 1/5] refactor vulture code, covering logic with unit tests --- cmd/tempo-vulture/main.go | 296 +++++++++++---------- cmd/tempo-vulture/main_test.go | 454 ++++++++++++++++++++++++++++++++- cmd/tempo-vulture/mocks.go | 121 +++++++++ pkg/httpclient/client.go | 22 ++ pkg/util/trace_info.go | 26 +- 5 files changed, 776 insertions(+), 143 deletions(-) create mode 100644 cmd/tempo-vulture/mocks.go diff --git a/cmd/tempo-vulture/main.go b/cmd/tempo-vulture/main.go index 9ec5fbdfa93..e83dfea3eeb 100644 --- a/cmd/tempo-vulture/main.go +++ b/cmd/tempo-vulture/main.go @@ -57,6 +57,17 @@ type traceMetrics struct { requestFailed int notFoundSearchAttribute int } +type vultureConfiguration struct { + tempoQueryURL string + tempoPushURL string + tempoOrgID string + tempoWriteBackoffDuration time.Duration + tempoLongWriteBackoffDuration time.Duration + tempoReadBackoffDuration time.Duration + tempoSearchBackoffDuration time.Duration + tempoRetentionDuration time.Duration + tempoPushTLS bool +} func init() { flag.StringVar(&prometheusPath, "prometheus-path", "/metrics", "The path to publish Prometheus metrics to.") @@ -85,91 +96,152 @@ func main() { logger.Info("Tempo Vulture starting") - actualStartTime := time.Now() - startTime := actualStartTime - tickerWrite := time.NewTicker(tempoWriteBackoffDuration) + vultureConfig := vultureConfiguration{ + tempoQueryURL: tempoQueryURL, + tempoPushURL: tempoPushURL, + tempoOrgID: tempoOrgID, + tempoWriteBackoffDuration: tempoWriteBackoffDuration, + tempoLongWriteBackoffDuration: tempoLongWriteBackoffDuration, + tempoReadBackoffDuration: tempoReadBackoffDuration, + tempoSearchBackoffDuration: tempoSearchBackoffDuration, + tempoRetentionDuration: tempoRetentionDuration, + tempoPushTLS: tempoPushTLS, + } + + jaegerClient, err := newJaegerGRPCClient(vultureConfig.tempoPushURL, vultureConfig, logger) + if err != nil { + panic(err) + } + httpClient := httpclient.New(vultureConfig.tempoQueryURL, vultureConfig.tempoOrgID) + + tickerWrite, tickerRead, tickerSearch, err := initTickers(vultureConfig.tempoWriteBackoffDuration, vultureConfig.tempoReadBackoffDuration, vultureConfig.tempoSearchBackoffDuration) + if err != nil { + panic(err) + } + startTime := time.Now() + r := rand.New(rand.NewSource(startTime.Unix())) + interval := vultureConfig.tempoWriteBackoffDuration + + doWrite(jaegerClient, tickerWrite, interval, vultureConfig, logger) + doRead(httpClient, tickerRead, startTime, interval, r, vultureConfig, logger) + doSearch(httpClient, tickerSearch, startTime, interval, r, vultureConfig, logger) - r := rand.New(rand.NewSource(actualStartTime.Unix())) + http.Handle(prometheusPath, promhttp.Handler()) + log.Fatal(http.ListenAndServe(prometheusListenAddress, nil)) +} - var tickerRead *time.Ticker +func initTickers(tempoWriteBackoffDuration time.Duration, tempoReadBackoffDuration time.Duration, tempoSearchBackoffDuration time.Duration) (tickerWrite *time.Ticker, tickerRead *time.Ticker, tickerSearch *time.Ticker, err error) { + if tempoWriteBackoffDuration <= 0 { + return nil, nil, nil, errors.New("tempo-write-backoff-duration must be greater than 0") + } + tickerWrite = time.NewTicker(tempoWriteBackoffDuration) if tempoReadBackoffDuration > 0 { tickerRead = time.NewTicker(tempoReadBackoffDuration) } - - var tickerSearch *time.Ticker if tempoSearchBackoffDuration > 0 { tickerSearch = time.NewTicker(tempoSearchBackoffDuration) } - if tickerRead == nil && tickerSearch == nil { - log.Fatalf("at least one of tempo-search-backoff-duration or tempo-read-backoff-duration must be set") + return nil, nil, nil, errors.New("at least one of tempo-search-backoff-duration or tempo-read-backoff-duration must be set") } + return tickerWrite, tickerRead, tickerSearch, nil +} - interval := tempoWriteBackoffDuration - - ready := func(info *util.TraceInfo, now time.Time) bool { - // Don't attempt to read on the first itteration if we can't reasonably - // expect the write loop to have fired yet. Double the duration here to - // avoid a race. - if info.Timestamp().Before(actualStartTime.Add(2 * tempoWriteBackoffDuration)) { - return false - } - - return info.Ready(now, tempoWriteBackoffDuration, tempoLongWriteBackoffDuration) +// Don't attempt to read on the first iteration if we can't reasonably +// expect the write loop to have fired yet. Double the duration here to +// avoid a race. +func traceIsReady(info *util.TraceInfo, now time.Time, startTime time.Time, writeBackoff time.Duration, longBackoff time.Duration) bool { + if info.Timestamp().Before(startTime.Add(2 * writeBackoff)) { + return false } - // Write - go func() { - client, err := newJaegerGRPCClient(tempoPushURL) - if err != nil { - panic(err) - } + return info.Ready(now, writeBackoff, longBackoff) +} +func doWrite(jaegerClient util.JaegerClient, tickerWrite *time.Ticker, interval time.Duration, config vultureConfiguration, l *zap.Logger) { + go func() { for now := range tickerWrite.C { timestamp := now.Round(interval) - info := util.NewTraceInfo(timestamp, tempoOrgID) + info := util.NewTraceInfo(timestamp, config.tempoOrgID) - log := logger.With( - zap.String("org_id", tempoOrgID), + logger := l.With( + zap.String("org_id", config.tempoOrgID), zap.Int64("seed", info.Timestamp().Unix()), ) - log.Info("sending trace") + logger.Info("sending trace") - err := info.EmitBatches(client) + err := info.EmitBatches(jaegerClient) if err != nil { metricErrorTotal.Inc() } - queueFutureBatches(client, info) + queueFutureBatches(jaegerClient, info, config, l) } }() +} + +func queueFutureBatches(client util.JaegerClient, info *util.TraceInfo, config vultureConfiguration, l *zap.Logger) { + if info.LongWritesRemaining() == 0 { + return + } + + logger := l.With( + zap.String("org_id", config.tempoOrgID), + zap.String("write_trace_id", info.HexID()), + zap.Int64("seed", info.Timestamp().Unix()), + zap.Int64("longWritesRemaining", info.LongWritesRemaining()), + ) + logger.Info("queueing future batches") + + info.Done() + + go func() { + time.Sleep(config.tempoLongWriteBackoffDuration) - // Read + logger := l.With( + zap.String("org_id", config.tempoOrgID), + zap.String("write_trace_id", info.HexID()), + zap.Int64("seed", info.Timestamp().Unix()), + zap.Int64("longWritesRemaining", info.LongWritesRemaining()), + ) + logger.Info("sending trace") + + err := info.EmitBatches(client) + if err != nil { + logger.Error("failed to queue batches", + zap.Error(err), + ) + } + + queueFutureBatches(client, info, config, l) + }() +} + +func doRead(httpClient httpclient.HttpClient, tickerRead *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { if tickerRead != nil { go func() { for now := range tickerRead.C { var seed time.Time startTime, seed = selectPastTimestamp(startTime, now, interval, tempoRetentionDuration, r) - log := logger.With( - zap.String("org_id", tempoOrgID), + logger := l.With( + zap.String("org_id", config.tempoOrgID), zap.Int64("seed", seed.Unix()), ) - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) // Don't query for a trace we don't expect to be complete - if !ready(info, now) { + if !traceIsReady(info, now, startTime, + config.tempoWriteBackoffDuration, config.tempoLongWriteBackoffDuration) { continue } - client := httpclient.New(tempoQueryURL, tempoOrgID) - // query the trace - queryMetrics, err := queryTrace(client, info) + queryMetrics, err := queryTrace(httpClient, info, l) if err != nil { metricErrorTotal.Inc() - log.Error("query for metrics failed", + logger.Error("query for metrics failed", zap.Error(err), ) } @@ -177,40 +249,40 @@ func main() { } }() } +} - // Search +func doSearch(httpClient httpclient.HttpClient, tickerSearch *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { if tickerSearch != nil { go func() { for now := range tickerSearch.C { - _, seed := selectPastTimestamp(startTime, now, interval, tempoRetentionDuration, r) - log := logger.With( - zap.String("org_id", tempoOrgID), + _, seed := selectPastTimestamp(startTime, now, interval, config.tempoRetentionDuration, r) + logger := l.With( + zap.String("org_id", config.tempoOrgID), zap.Int64("seed", seed.Unix()), ) - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) - if !ready(info, now) { + if !traceIsReady(info, now, startTime, + config.tempoWriteBackoffDuration, config.tempoLongWriteBackoffDuration) { continue } - client := httpclient.New(tempoQueryURL, tempoOrgID) - // query a tag we expect the trace to be found within - searchMetrics, err := searchTag(client, seed) + searchMetrics, err := searchTag(httpClient, seed, config, l) if err != nil { metricErrorTotal.Inc() - log.Error("search tag for metrics failed", + logger.Error("search tag for metrics failed", zap.Error(err), ) } pushMetrics(searchMetrics) // traceql query - traceqlSearchMetrics, err := searchTraceql(client, seed) + traceqlSearchMetrics, err := searchTraceql(httpClient, seed, config, l) if err != nil { metricErrorTotal.Inc() - log.Error("traceql query for metrics failed", + logger.Error("traceql query for metrics failed", zap.Error(err), ) } @@ -218,46 +290,6 @@ func main() { } }() } - - http.Handle(prometheusPath, promhttp.Handler()) - log.Fatal(http.ListenAndServe(prometheusListenAddress, nil)) -} - -func queueFutureBatches(client *jaeger_grpc.Reporter, info *util.TraceInfo) { - if info.LongWritesRemaining() == 0 { - return - } - - log := logger.With( - zap.String("org_id", tempoOrgID), - zap.String("write_trace_id", info.HexID()), - zap.Int64("seed", info.Timestamp().Unix()), - zap.Int64("longWritesRemaining", info.LongWritesRemaining()), - ) - log.Info("queueing future batches") - - info.Done() - - go func() { - time.Sleep(tempoLongWriteBackoffDuration) - - log := logger.With( - zap.String("org_id", tempoOrgID), - zap.String("write_trace_id", info.HexID()), - zap.Int64("seed", info.Timestamp().Unix()), - zap.Int64("longWritesRemaining", info.LongWritesRemaining()), - ) - log.Info("sending trace") - - err := info.EmitBatches(client) - if err != nil { - log.Error("failed to queue batches", - zap.Error(err), - ) - } - - queueFutureBatches(client, info) - }() } func pushMetrics(metrics traceMetrics) { @@ -285,7 +317,7 @@ func selectPastTimestamp(start, stop time.Time, interval, retention time.Duratio return newStart.Round(interval), ts.Round(interval) } -func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { +func newJaegerGRPCClient(endpoint string, config vultureConfiguration, logger *zap.Logger) (*jaeger_grpc.Reporter, error) { // remove scheme and port u, err := url.Parse(endpoint) if err != nil { @@ -298,7 +330,7 @@ func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { var dialOpts []grpc.DialOption - if tempoPushTLS { + if config.tempoPushTLS { dialOpts = []grpc.DialOption{ grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ InsecureSkipVerify: true, @@ -320,16 +352,38 @@ func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { func generateRandomInt(min, max int64, r *rand.Rand) int64 { min++ - number := min + r.Int63n(max-min) + var duration int64 + duration = 1 + // This is to prevent a panic when min == max since substracting them will end in a negative number + if min < max { + duration = max - min + } + number := min + r.Int63n(duration) return number } -func searchTag(client *httpclient.Client, seed time.Time) (traceMetrics, error) { +func traceInTraces(traceID string, traces []*tempopb.TraceSearchMetadata) bool { + for _, t := range traces { + equal, err := util.EqualHexStringTraceIDs(t.TraceID, traceID) + if err != nil { + logger.Error("error comparing trace IDs", zap.Error(err)) + continue + } + + if equal { + return true + } + } + + return false +} + +func searchTag(client httpclient.HttpClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) hexID := info.HexID() // Get the expected @@ -339,29 +393,13 @@ func searchTag(client *httpclient.Client, seed time.Time) (traceMetrics, error) return traceMetrics{}, err } - traceInTraces := func(traceID string, traces []*tempopb.TraceSearchMetadata) bool { - for _, t := range traces { - equal, err := util.EqualHexStringTraceIDs(t.TraceID, traceID) - if err != nil { - logger.Error("error comparing trace IDs", zap.Error(err)) - continue - } - - if equal { - return true - } - } - - return false - } - attr := util.RandomAttrFromTrace(expected) if attr == nil { tm.notFoundSearchAttribute++ return tm, fmt.Errorf("no search attr selected from trace") } - logger := logger.With( + logger := l.With( zap.Int64("seed", seed.Unix()), zap.String("hexID", hexID), zap.Duration("ago", time.Since(seed)), @@ -389,44 +427,28 @@ func searchTag(client *httpclient.Client, seed time.Time) (traceMetrics, error) return tm, nil } -func searchTraceql(client *httpclient.Client, seed time.Time) (traceMetrics, error) { +func searchTraceql(client httpclient.HttpClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) hexID := info.HexID() // Get the expected expected, err := info.ConstructTraceFromEpoch() if err != nil { - logger.Error("unable to construct trace from epoch", zap.Error(err)) + l.Error("unable to construct trace from epoch", zap.Error(err)) return traceMetrics{}, err } - traceInTraces := func(traceID string, traces []*tempopb.TraceSearchMetadata) bool { - for _, t := range traces { - equal, err := util.EqualHexStringTraceIDs(t.TraceID, traceID) - if err != nil { - logger.Error("error comparing trace IDs", zap.Error(err)) - continue - } - - if equal { - return true - } - } - - return false - } - attr := util.RandomAttrFromTrace(expected) if attr == nil { tm.notFoundSearchAttribute++ return tm, fmt.Errorf("no search attr selected from trace") } - logger := logger.With( + logger := l.With( zap.Int64("seed", seed.Unix()), zap.String("hexID", hexID), zap.Duration("ago", time.Since(seed)), @@ -452,14 +474,14 @@ func searchTraceql(client *httpclient.Client, seed time.Time) (traceMetrics, err return tm, nil } -func queryTrace(client *httpclient.Client, info *util.TraceInfo) (traceMetrics, error) { +func queryTrace(client httpclient.HttpClient, info *util.TraceInfo, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } hexID := info.HexID() - logger := logger.With( + logger := l.With( zap.Int64("seed", info.Timestamp().Unix()), zap.String("hexID", hexID), zap.Duration("ago", time.Since(info.Timestamp())), diff --git a/cmd/tempo-vulture/main_test.go b/cmd/tempo-vulture/main_test.go index e0a8be819d8..0b1c4c92ff1 100644 --- a/cmd/tempo-vulture/main_test.go +++ b/cmd/tempo-vulture/main_test.go @@ -2,7 +2,8 @@ package main import ( "bytes" - "fmt" + "errors" + "math/rand" "os" "testing" "time" @@ -11,8 +12,10 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/grafana/tempo/pkg/tempopb" + v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util" ) @@ -88,7 +91,6 @@ func TestResponseFixture(t *testing.T) { marshaller := &jsonpb.Marshaler{} err = marshaller.Marshal(&jsonTrace, generatedTrace) require.NoError(t, err) - fmt.Println(jsonTrace.String()) assert.True(t, equalTraces(expected, generatedTrace)) @@ -111,3 +113,451 @@ func TestEqualTraces(t *testing.T) { require.True(t, equalTraces(a, b)) } + +func TestInitTickers(t *testing.T) { + tests := []struct { + name string + writeDuration, readDuration, searchDuration time.Duration + expectedWriteTicker bool + expectedReadTicker bool + expectedSearchTicker bool + expectedError string + }{ + { + name: "Valid write and read durations", + writeDuration: 1 * time.Second, + readDuration: 2 * time.Second, + searchDuration: 0, + expectedWriteTicker: true, + expectedReadTicker: true, + expectedSearchTicker: false, + expectedError: "", + }, + { + name: "Invalid write duration (zero)", + writeDuration: 0, + readDuration: 0, + searchDuration: 0, + expectedWriteTicker: false, + expectedReadTicker: false, + expectedSearchTicker: false, + expectedError: "tempo-write-backoff-duration must be greater than 0", + }, + { + name: "No read durations set", + writeDuration: 1 * time.Second, + readDuration: 0, + searchDuration: 1 * time.Second, + expectedWriteTicker: true, + expectedReadTicker: false, + expectedSearchTicker: true, + expectedError: "", + }, + { + name: "No read or search durations set", + writeDuration: 1 * time.Second, + readDuration: 0, + searchDuration: 0, + expectedWriteTicker: false, + expectedReadTicker: false, + expectedSearchTicker: false, + expectedError: "at least one of tempo-search-backoff-duration or tempo-read-backoff-duration must be set", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tickerWrite, tickerRead, tickerSearch, err := initTickers(tt.writeDuration, tt.readDuration, tt.searchDuration) + + assert.Equal(t, tt.expectedWriteTicker, tickerWrite != nil, "TickerWrite") + assert.Equal(t, tt.expectedReadTicker, tickerRead != nil, "TickerRead") + assert.Equal(t, tt.expectedSearchTicker, tickerSearch != nil, "TickerSearch") + + if tt.expectedError != "" { + assert.NotNil(t, err, "Expected error but got nil") + assert.EqualError(t, err, tt.expectedError, "Error message mismatch") + } else { + assert.Nil(t, err, "Expected no error but got one") + } + }) + } +} + +func TestTraceIsReady(t *testing.T) { + writeBackoff := 1 * time.Second + longWriteBackoff := 5 * time.Second + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + ti := util.NewTraceInfo(seed, "test") + + startTime := time.Date(2009, 1, 1, 12, 0, 0, 0, time.UTC) + ready := traceIsReady(ti, time.Now(), startTime, writeBackoff, longWriteBackoff) + + assert.False(t, ready, "trace should not be ready yet") + + startTime = time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + ready = traceIsReady(ti, seed.Add(2*longWriteBackoff), startTime, writeBackoff, longWriteBackoff) + assert.True(t, ready, "trace should be ready now") + +} + +func TestDoWrite(t *testing.T) { + mockJaegerClient := MockReporter{err: nil} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + + doWrite(&mockJaegerClient, ticker, config.tempoWriteBackoffDuration, config, logger) + + time.Sleep(time.Second) + ticker.Stop() +} + +func TestQueueFutureBatches(t *testing.T) { + mockJaegerClient := MockReporter{err: nil} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second * 0, + } + + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfoWithMaxLongWrites(seed, 1, "test") + logger = zap.NewNop() + + queueFutureBatches(&mockJaegerClient, traceInfo, config, logger) + time.Sleep(time.Second) + require.Greater(t, len(mockJaegerClient.batches_emited), 0) +} + +type traceOps func(*tempopb.Trace) + +func TestQueryTrace(t *testing.T) { + noOp := func(_ *tempopb.Trace) {} + setMissingSpan := func(trace *tempopb.Trace) { + trace.Batches[0].ScopeSpans[0].Spans[0].ParentSpanId = []byte{'t', 'e', 's', 't'} + } + setNoBatchesSpan := func(trace *tempopb.Trace) { + trace.Batches = make([]*v1.ResourceSpans, 0) + } + setAlteredSpan := func(trace *tempopb.Trace) { + trace.Batches[0].ScopeSpans[0].Spans[0].Name = "Different spam" + } + tests := []struct { + name string + traceOperation func(*tempopb.Trace) + err error + expectedMetrics traceMetrics + expectedError error + }{ + { + name: "assert querytrace yields an unexpected error", + traceOperation: noOp, + err: errors.New("unexpected error"), + expectedMetrics: traceMetrics{ + requested: 1, + requestFailed: 1, + }, + expectedError: errors.New("unexpected error"), + }, + { + name: "assert querytrace yields traceNotFound error", + traceOperation: noOp, + err: util.ErrTraceNotFound, + expectedMetrics: traceMetrics{ + requested: 1, + notFoundByID: 1, + }, + expectedError: util.ErrTraceNotFound, + }, + { + name: "assert querytrace for ok trace", + traceOperation: noOp, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + }, + expectedError: nil, + }, + { + name: "assert querytrace for a trace with missing spans", + traceOperation: setMissingSpan, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + missingSpans: 1, + }, + expectedError: nil, + }, + { + name: "assert querytrace for a trace without batches", + traceOperation: setNoBatchesSpan, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + notFoundByID: 1, + }, + expectedError: nil, + }, + { + name: "assert querytrace for a trace different than the ingested one", + traceOperation: setAlteredSpan, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + incorrectResult: 1, + }, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics, err := doQueryTrace(tt.traceOperation, tt.err) + assert.Equal(t, tt.expectedMetrics, metrics) + assert.Equal(t, tt.expectedError, err) + }) + } +} + +func doQueryTrace(f traceOps, err error) (traceMetrics, error) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + + mockHttpClient := MockHttpClient{err: err, traceResp: trace} + logger = zap.NewNop() + f(trace) + return queryTrace(&mockHttpClient, traceInfo, logger) +} + +func TestDoRead(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + mockHttpClient := MockHttpClient{err: nil, traceResp: trace} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + // Assert ticker is nil + doRead(&mockHttpClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, 0, mockHttpClient.requestsCount) + + // Assert an ok read + doRead(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) + ticker.Stop() + assert.Greater(t, mockHttpClient.requestsCount, 0) + + // Assert a read with errors + mockHttpClient = MockHttpClient{err: errors.New("an error"), traceResp: trace} + doRead(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) + ticker.Stop() + assert.Equal(t, 1, mockHttpClient.requestsCount) +} + +func TestSearchTraceql(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + info := util.NewTraceInfo(seed, config.tempoOrgID) + hexID := info.HexID() + + searchResponse := []*tempopb.TraceSearchMetadata{ + { + SpanSets: []*tempopb.SpanSet{ + { + Spans: []*tempopb.Span{ + { + SpanID: hexID, + StartTimeUnixNano: 1000000000000, + DurationNanos: 1000000000, + Name: "", + Attributes: []*v1_common.KeyValue{ + {Key: "foo", Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: "Bar"}}}, + }, + }, + }, + }, + }, + }, + } + + mockHttpClient := MockHttpClient{err: nil, searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err := searchTraceql(&mockHttpClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + notFoundTraceQL: 1, + }, metrics) + + mockHttpClient = MockHttpClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err = searchTraceql(&mockHttpClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + requestFailed: 1, + }, metrics) +} + +func TestSearchTag(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + info := util.NewTraceInfo(seed, config.tempoOrgID) + hexID := info.HexID() + + searchResponse := []*tempopb.TraceSearchMetadata{ + { + SpanSets: []*tempopb.SpanSet{ + { + Spans: []*tempopb.Span{ + { + SpanID: hexID, + StartTimeUnixNano: 1000000000000, + DurationNanos: 1000000000, + Name: "", + Attributes: []*v1_common.KeyValue{ + {Key: "foo", Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: "Bar"}}}, + }, + }, + }, + }, + }, + }, + } + + mockHttpClient := MockHttpClient{err: nil, searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err := searchTag(&mockHttpClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + notFoundSearch: 1, + }, metrics) + + mockHttpClient = MockHttpClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err = searchTag(&mockHttpClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + requestFailed: 1, + }, metrics) + +} + +func TestDoSearch(t *testing.T) { + + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + //This is a hack to ensure the trace is "ready" + tempoWriteBackoffDuration: -time.Hour * 10000, + tempoRetentionDuration: time.Second * 10, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + searchResponse := []*tempopb.TraceSearchMetadata{ + { + SpanSets: []*tempopb.SpanSet{ + { + Spans: []*tempopb.Span{ + { + SpanID: traceInfo.HexID(), + StartTimeUnixNano: 1000000000000, + DurationNanos: 1000000000, + Name: "", + Attributes: []*v1_common.KeyValue{ + {Key: "foo", Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: "Bar"}}}, + }, + }, + }, + }, + }, + }, + } + logger = zap.NewNop() + + mockHttpClient := MockHttpClient{err: nil, searchResponse: searchResponse} + // Assert when ticker is nil + doSearch(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, mockHttpClient.searchesCount, 0) + + // Assert an ok search + doSearch(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) + ticker.Stop() + + assert.Greater(t, mockHttpClient.searchesCount, 0) + + // Assert an errored search + mockHttpClient = MockHttpClient{err: errors.New("an error"), searchResponse: searchResponse} + + doSearch(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) + ticker.Stop() + + assert.Equal(t, mockHttpClient.searchesCount, 2) +} + +func TestNewJaegerGRPCClient(t *testing.T) { + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + client, err := newJaegerGRPCClient("http://localhost", config, zap.NewNop()) + + assert.NoError(t, err) + assert.NotNil(t, client) + + config = vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + tempoPushTLS: true, + } + client, err = newJaegerGRPCClient("http://localhost", config, zap.NewNop()) + + assert.NoError(t, err) + assert.NotNil(t, client) +} diff --git a/cmd/tempo-vulture/mocks.go b/cmd/tempo-vulture/mocks.go new file mode 100644 index 00000000000..a0ba948317a --- /dev/null +++ b/cmd/tempo-vulture/mocks.go @@ -0,0 +1,121 @@ +package main + +import ( + "context" + "net/http" + + userconfigurableoverrides "github.com/grafana/tempo/modules/overrides/userconfigurable/client" + thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + + "github.com/grafana/tempo/pkg/tempopb" +) + +type MockReporter struct { + err error + batches_emited []*thrift.Batch +} + +func (r MockReporter) EmitZipkinBatch(_ context.Context, _ []*zipkincore.Span) error { + return r.err +} + +func (r *MockReporter) EmitBatch(_ context.Context, b *thrift.Batch) error { + r.batches_emited = append(r.batches_emited, b) + return r.err +} + +type MockHttpClient struct { + err error + resp http.Response + traceResp *tempopb.Trace + requestsCount int + searchResponse []*tempopb.TraceSearchMetadata + searchesCount int +} + +func (m MockHttpClient) DeleteOverrides(version string) error { + panic("unimplemented") +} + +func (m MockHttpClient) Do(req *http.Request) (*http.Response, error) { + return &m.resp, m.err +} + +func (m MockHttpClient) GetOverrides() (*userconfigurableoverrides.Limits, string, error) { + panic("unimplemented") +} + +func (m MockHttpClient) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) { + panic("unimplemented") +} + +func (m MockHttpClient) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) { + panic("unimplemented") +} + +func (m *MockHttpClient) QueryTrace(id string) (*tempopb.Trace, error) { + m.requestsCount++ + return m.traceResp, m.err +} + +func (m MockHttpClient) Search(tags string) (*tempopb.SearchResponse, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTagValuesV2(key string, query string) (*tempopb.SearchTagValuesV2Response, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTags() (*tempopb.SearchTagsResponse, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTagsV2() (*tempopb.SearchTagsV2Response, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) { + panic("unimplemented") +} + +func (m MockHttpClient) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { + panic("unimplemented") +} + +func (m *MockHttpClient) SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) { + traceQlSearchResponse := &tempopb.SearchResponse{ + Traces: m.searchResponse, + } + m.searchesCount++ + return traceQlSearchResponse, m.err +} + +func (m *MockHttpClient) SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) { + traceQlSearchResponse := &tempopb.SearchResponse{ + Traces: m.searchResponse, + } + + m.searchesCount++ + return traceQlSearchResponse, m.err +} + +func (m MockHttpClient) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) { + panic("unimplemented") +} + +func (m MockHttpClient) WithTransport(t http.RoundTripper) { + panic("unimplemented") +} diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 23c10ce9612..55f47e420cd 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -33,6 +33,28 @@ const ( applicationJSON = "application/json" ) +type HttpClient interface { + WithTransport(t http.RoundTripper) + Do(req *http.Request) (*http.Response, error) + SearchTags() (*tempopb.SearchTagsResponse, error) + SearchTagsV2() (*tempopb.SearchTagsV2Response, error) + SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) + SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) + SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) + SearchTagValuesV2(key, query string) (*tempopb.SearchTagValuesV2Response, error) + SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) + Search(tags string) (*tempopb.SearchResponse, error) + SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) + QueryTrace(id string) (*tempopb.Trace, error) + SearchTraceQL(query string) (*tempopb.SearchResponse, error) + SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) + MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) + GetOverrides() (*userconfigurableoverrides.Limits, string, error) + SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) + PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) + DeleteOverrides(version string) error +} + var ErrNotFound = errors.New("resource not found") // Client is client to the Tempo API. diff --git a/pkg/util/trace_info.go b/pkg/util/trace_info.go index 3af8d5c0cd4..7e82d8817e1 100644 --- a/pkg/util/trace_info.go +++ b/pkg/util/trace_info.go @@ -7,8 +7,8 @@ import ( "time" "github.com/grafana/dskit/user" - jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + zipkincore "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" jaegerTrans "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" @@ -37,6 +37,12 @@ type TraceInfo struct { tempoOrgID string } +// JaegerClient is an interface used to mock the underlying client in tests. +type JaegerClient interface { + EmitBatch(ctx context.Context, b *thrift.Batch) error + EmitZipkinBatch(ctx context.Context, zSpans []*zipkincore.Span) error +} + // NewTraceInfo is used to produce a new TraceInfo. func NewTraceInfo(timestamp time.Time, tempoOrgID string) *TraceInfo { r := newRand(timestamp) @@ -50,6 +56,18 @@ func NewTraceInfo(timestamp time.Time, tempoOrgID string) *TraceInfo { tempoOrgID: tempoOrgID, } } +func NewTraceInfoWithMaxLongWrites(timestamp time.Time, maxLongWrites int64, tempoOrgID string) *TraceInfo { + r := newRand(timestamp) + + return &TraceInfo{ + timestamp: timestamp, + r: r, + traceIDHigh: r.Int63(), + traceIDLow: r.Int63(), + longWritesRemaining: maxLongWrites, + tempoOrgID: tempoOrgID, + } +} func (t *TraceInfo) Ready(now time.Time, writeBackoff, longWriteBackoff time.Duration) bool { // Don't use the last time interval to allow the write loop to finish before @@ -58,7 +76,7 @@ func (t *TraceInfo) Ready(now time.Time, writeBackoff, longWriteBackoff time.Dur return false } - // Compare a new instance with the same timstamp to know how many longWritesRemaining. + // Compare a new instance with the same timestamp to know how many longWritesRemaining. totalWrites := NewTraceInfo(t.timestamp, t.tempoOrgID).longWritesRemaining // We are not ready if not all writes have had a chance to send. lastWrite := t.timestamp.Add(time.Duration(totalWrites) * longWriteBackoff) @@ -85,7 +103,7 @@ func (t *TraceInfo) Done() { t.longWritesRemaining-- } -func (t *TraceInfo) EmitBatches(c *jaeger_grpc.Reporter) error { +func (t *TraceInfo) EmitBatches(c JaegerClient) error { for i := int64(0); i < t.generateRandomInt(1, maxBatchesPerWrite); i++ { ctx := user.InjectOrgID(context.Background(), t.tempoOrgID) ctx, err := user.InjectIntoGRPCRequest(ctx) @@ -104,7 +122,7 @@ func (t *TraceInfo) EmitBatches(c *jaeger_grpc.Reporter) error { // EmitAllBatches sends all the batches that would normally be sent at some // interval when using EmitBatches. -func (t *TraceInfo) EmitAllBatches(c *jaeger_grpc.Reporter) error { +func (t *TraceInfo) EmitAllBatches(c JaegerClient) error { err := t.EmitBatches(c) if err != nil { return err From 260c8cc0506a1c61cbbbd5ef82d4f5e190787388 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Sun, 7 Jul 2024 21:16:33 -0400 Subject: [PATCH 2/5] fix linting --- cmd/tempo-vulture/main.go | 12 +++--- cmd/tempo-vulture/main_test.go | 76 ++++++++++++++++++++-------------- cmd/tempo-vulture/mocks.go | 63 ++++++++++++++++++---------- pkg/httpclient/client.go | 2 +- 4 files changed, 93 insertions(+), 60 deletions(-) diff --git a/cmd/tempo-vulture/main.go b/cmd/tempo-vulture/main.go index e83dfea3eeb..88bb18f5722 100644 --- a/cmd/tempo-vulture/main.go +++ b/cmd/tempo-vulture/main.go @@ -217,7 +217,7 @@ func queueFutureBatches(client util.JaegerClient, info *util.TraceInfo, config v }() } -func doRead(httpClient httpclient.HttpClient, tickerRead *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { +func doRead(httpClient httpclient.HTTPClient, tickerRead *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { if tickerRead != nil { go func() { for now := range tickerRead.C { @@ -251,7 +251,7 @@ func doRead(httpClient httpclient.HttpClient, tickerRead *time.Ticker, startTime } } -func doSearch(httpClient httpclient.HttpClient, tickerSearch *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { +func doSearch(httpClient httpclient.HTTPClient, tickerSearch *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { if tickerSearch != nil { go func() { for now := range tickerSearch.C { @@ -354,7 +354,7 @@ func generateRandomInt(min, max int64, r *rand.Rand) int64 { min++ var duration int64 duration = 1 - // This is to prevent a panic when min == max since substracting them will end in a negative number + // This is to prevent a panic when min == max since subtracting them will end in a negative number if min < max { duration = max - min } @@ -378,7 +378,7 @@ func traceInTraces(traceID string, traces []*tempopb.TraceSearchMetadata) bool { return false } -func searchTag(client httpclient.HttpClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { +func searchTag(client httpclient.HTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } @@ -427,7 +427,7 @@ func searchTag(client httpclient.HttpClient, seed time.Time, config vultureConfi return tm, nil } -func searchTraceql(client httpclient.HttpClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { +func searchTraceql(client httpclient.HTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } @@ -474,7 +474,7 @@ func searchTraceql(client httpclient.HttpClient, seed time.Time, config vultureC return tm, nil } -func queryTrace(client httpclient.HttpClient, info *util.TraceInfo, l *zap.Logger) (traceMetrics, error) { +func queryTrace(client httpclient.HTTPClient, info *util.TraceInfo, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } diff --git a/cmd/tempo-vulture/main_test.go b/cmd/tempo-vulture/main_test.go index 0b1c4c92ff1..66e5d06200d 100644 --- a/cmd/tempo-vulture/main_test.go +++ b/cmd/tempo-vulture/main_test.go @@ -197,7 +197,6 @@ func TestTraceIsReady(t *testing.T) { startTime = time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) ready = traceIsReady(ti, seed.Add(2*longWriteBackoff), startTime, writeBackoff, longWriteBackoff) assert.True(t, ready, "trace should be ready now") - } func TestDoWrite(t *testing.T) { @@ -215,6 +214,16 @@ func TestDoWrite(t *testing.T) { time.Sleep(time.Second) ticker.Stop() + + assert.Greater(t, len(mockJaegerClient.batchesEmitted), 0) + + // assert an error + mockJaegerClient = MockReporter{err: errors.New("an error")} + doWrite(&mockJaegerClient, ticker, config.tempoWriteBackoffDuration, config, logger) + time.Sleep(time.Second) + ticker.Stop() + + assert.Greater(t, len(mockJaegerClient.batchesEmitted), 0) } func TestQueueFutureBatches(t *testing.T) { @@ -231,7 +240,14 @@ func TestQueueFutureBatches(t *testing.T) { queueFutureBatches(&mockJaegerClient, traceInfo, config, logger) time.Sleep(time.Second) - require.Greater(t, len(mockJaegerClient.batches_emited), 0) + require.Greater(t, len(mockJaegerClient.batchesEmitted), 0) + + // Assert an error + mockJaegerClient = MockReporter{err: errors.New("an error")} + + queueFutureBatches(&mockJaegerClient, traceInfo, config, logger) + time.Sleep(time.Second) + require.Equal(t, len(mockJaegerClient.batchesEmitted), 0) } type traceOps func(*tempopb.Trace) @@ -330,10 +346,10 @@ func doQueryTrace(f traceOps, err error) (traceMetrics, error) { trace, _ := traceInfo.ConstructTraceFromEpoch() - mockHttpClient := MockHttpClient{err: err, traceResp: trace} + mockHTTPClient := MockHTTPClient{err: err, traceResp: trace} logger = zap.NewNop() f(trace) - return queryTrace(&mockHttpClient, traceInfo, logger) + return queryTrace(&mockHTTPClient, traceInfo, logger) } func TestDoRead(t *testing.T) { @@ -342,7 +358,7 @@ func TestDoRead(t *testing.T) { trace, _ := traceInfo.ConstructTraceFromEpoch() startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) - mockHttpClient := MockHttpClient{err: nil, traceResp: trace} + mockHTTPClient := MockHTTPClient{err: nil, traceResp: trace} // Define the configuration config := vultureConfiguration{ tempoOrgID: "orgID", @@ -354,21 +370,21 @@ func TestDoRead(t *testing.T) { r := rand.New(rand.NewSource(startTime.Unix())) // Assert ticker is nil - doRead(&mockHttpClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) - assert.Equal(t, 0, mockHttpClient.requestsCount) + doRead(&mockHTTPClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, 0, mockHTTPClient.requestsCount) // Assert an ok read - doRead(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + doRead(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) time.Sleep(time.Second) ticker.Stop() - assert.Greater(t, mockHttpClient.requestsCount, 0) + assert.Greater(t, mockHTTPClient.requestsCount, 0) // Assert a read with errors - mockHttpClient = MockHttpClient{err: errors.New("an error"), traceResp: trace} - doRead(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + mockHTTPClient = MockHTTPClient{err: errors.New("an error"), traceResp: trace} + doRead(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) time.Sleep(time.Second) ticker.Stop() - assert.Equal(t, 1, mockHttpClient.requestsCount) + assert.Equal(t, 1, mockHTTPClient.requestsCount) } func TestSearchTraceql(t *testing.T) { @@ -402,10 +418,10 @@ func TestSearchTraceql(t *testing.T) { }, } - mockHttpClient := MockHttpClient{err: nil, searchResponse: searchResponse} + mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} logger = zap.NewNop() - metrics, err := searchTraceql(&mockHttpClient, seed, config, logger) + metrics, err := searchTraceql(&mockHTTPClient, seed, config, logger) assert.Error(t, err) assert.Equal(t, traceMetrics{ @@ -413,10 +429,10 @@ func TestSearchTraceql(t *testing.T) { notFoundTraceQL: 1, }, metrics) - mockHttpClient = MockHttpClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} + mockHTTPClient = MockHTTPClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} logger = zap.NewNop() - metrics, err = searchTraceql(&mockHttpClient, seed, config, logger) + metrics, err = searchTraceql(&mockHTTPClient, seed, config, logger) assert.Error(t, err) assert.Equal(t, traceMetrics{ @@ -456,10 +472,10 @@ func TestSearchTag(t *testing.T) { }, } - mockHttpClient := MockHttpClient{err: nil, searchResponse: searchResponse} + mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} logger = zap.NewNop() - metrics, err := searchTag(&mockHttpClient, seed, config, logger) + metrics, err := searchTag(&mockHTTPClient, seed, config, logger) assert.Error(t, err) assert.Equal(t, traceMetrics{ @@ -467,21 +483,19 @@ func TestSearchTag(t *testing.T) { notFoundSearch: 1, }, metrics) - mockHttpClient = MockHttpClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} + mockHTTPClient = MockHTTPClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} logger = zap.NewNop() - metrics, err = searchTag(&mockHttpClient, seed, config, logger) + metrics, err = searchTag(&mockHTTPClient, seed, config, logger) assert.Error(t, err) assert.Equal(t, traceMetrics{ requested: 1, requestFailed: 1, }, metrics) - } func TestDoSearch(t *testing.T) { - seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) traceInfo := util.NewTraceInfo(seed, "test") startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) @@ -489,7 +503,7 @@ func TestDoSearch(t *testing.T) { // Define the configuration config := vultureConfiguration{ tempoOrgID: "orgID", - //This is a hack to ensure the trace is "ready" + // This is a hack to ensure the trace is "ready" tempoWriteBackoffDuration: -time.Hour * 10000, tempoRetentionDuration: time.Second * 10, } @@ -519,26 +533,26 @@ func TestDoSearch(t *testing.T) { } logger = zap.NewNop() - mockHttpClient := MockHttpClient{err: nil, searchResponse: searchResponse} + mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} // Assert when ticker is nil - doSearch(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) - assert.Equal(t, mockHttpClient.searchesCount, 0) + doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, mockHTTPClient.searchesCount, 0) // Assert an ok search - doSearch(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) time.Sleep(time.Second) ticker.Stop() - assert.Greater(t, mockHttpClient.searchesCount, 0) + assert.Greater(t, mockHTTPClient.searchesCount, 0) // Assert an errored search - mockHttpClient = MockHttpClient{err: errors.New("an error"), searchResponse: searchResponse} + mockHTTPClient = MockHTTPClient{err: errors.New("an error"), searchResponse: searchResponse} - doSearch(&mockHttpClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) time.Sleep(time.Second) ticker.Stop() - assert.Equal(t, mockHttpClient.searchesCount, 2) + assert.Equal(t, mockHTTPClient.searchesCount, 2) } func TestNewJaegerGRPCClient(t *testing.T) { diff --git a/cmd/tempo-vulture/mocks.go b/cmd/tempo-vulture/mocks.go index a0ba948317a..ce18b74cd39 100644 --- a/cmd/tempo-vulture/mocks.go +++ b/cmd/tempo-vulture/mocks.go @@ -13,7 +13,7 @@ import ( type MockReporter struct { err error - batches_emited []*thrift.Batch + batchesEmitted []*thrift.Batch } func (r MockReporter) EmitZipkinBatch(_ context.Context, _ []*zipkincore.Span) error { @@ -21,11 +21,11 @@ func (r MockReporter) EmitZipkinBatch(_ context.Context, _ []*zipkincore.Span) e } func (r *MockReporter) EmitBatch(_ context.Context, b *thrift.Batch) error { - r.batches_emited = append(r.batches_emited, b) + r.batchesEmitted = append(r.batchesEmitted, b) return r.err } -type MockHttpClient struct { +type MockHTTPClient struct { err error resp http.Response traceResp *tempopb.Trace @@ -34,68 +34,84 @@ type MockHttpClient struct { searchesCount int } -func (m MockHttpClient) DeleteOverrides(version string) error { +//nolint:all +func (m MockHTTPClient) DeleteOverrides(version string) error { panic("unimplemented") } -func (m MockHttpClient) Do(req *http.Request) (*http.Response, error) { +//nolint:all +func (m MockHTTPClient) Do(req *http.Request) (*http.Response, error) { return &m.resp, m.err } -func (m MockHttpClient) GetOverrides() (*userconfigurableoverrides.Limits, string, error) { +//nolint:all +func (m MockHTTPClient) GetOverrides() (*userconfigurableoverrides.Limits, string, error) { panic("unimplemented") } -func (m MockHttpClient) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) { +//nolint:all +func (m MockHTTPClient) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) { panic("unimplemented") } -func (m MockHttpClient) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) { +//nolint:all +func (m MockHTTPClient) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) { panic("unimplemented") } -func (m *MockHttpClient) QueryTrace(id string) (*tempopb.Trace, error) { +//nolint:all +func (m *MockHTTPClient) QueryTrace(id string) (*tempopb.Trace, error) { m.requestsCount++ return m.traceResp, m.err } -func (m MockHttpClient) Search(tags string) (*tempopb.SearchResponse, error) { +//nolint:all +func (m MockHTTPClient) Search(tags string) (*tempopb.SearchResponse, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) { +//nolint:all +func (m MockHTTPClient) SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTagValuesV2(key string, query string) (*tempopb.SearchTagValuesV2Response, error) { +//nolint:all +func (m MockHTTPClient) SearchTagValuesV2(key string, query string) (*tempopb.SearchTagValuesV2Response, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) { +//nolint:all +func (m MockHTTPClient) SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTags() (*tempopb.SearchTagsResponse, error) { +//nolint:all +func (m MockHTTPClient) SearchTags() (*tempopb.SearchTagsResponse, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTagsV2() (*tempopb.SearchTagsV2Response, error) { +//nolint:all +func (m MockHTTPClient) SearchTagsV2() (*tempopb.SearchTagsV2Response, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) { +//nolint:all +func (m MockHTTPClient) SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) { +//nolint:all +func (m MockHTTPClient) SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) { panic("unimplemented") } -func (m MockHttpClient) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { +//nolint:all +func (m MockHTTPClient) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { panic("unimplemented") } -func (m *MockHttpClient) SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) { +//nolint:all +func (m *MockHTTPClient) SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) { traceQlSearchResponse := &tempopb.SearchResponse{ Traces: m.searchResponse, } @@ -103,7 +119,8 @@ func (m *MockHttpClient) SearchTraceQLWithRange(query string, start int64, end i return traceQlSearchResponse, m.err } -func (m *MockHttpClient) SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) { +//nolint:all +func (m *MockHTTPClient) SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) { traceQlSearchResponse := &tempopb.SearchResponse{ Traces: m.searchResponse, } @@ -112,10 +129,12 @@ func (m *MockHttpClient) SearchWithRange(tags string, start int64, end int64) (* return traceQlSearchResponse, m.err } -func (m MockHttpClient) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) { +//nolint:all +func (m MockHTTPClient) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) { panic("unimplemented") } -func (m MockHttpClient) WithTransport(t http.RoundTripper) { +//nolint:all +func (m MockHTTPClient) WithTransport(t http.RoundTripper) { panic("unimplemented") } diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 55f47e420cd..456fdb9cfe8 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -33,7 +33,7 @@ const ( applicationJSON = "application/json" ) -type HttpClient interface { +type HTTPClient interface { WithTransport(t http.RoundTripper) Do(req *http.Request) (*http.Response, error) SearchTags() (*tempopb.SearchTagsResponse, error) From 43c394db6355b873e9db80e6081d3b8613f86449 Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Sun, 7 Jul 2024 21:21:40 -0400 Subject: [PATCH 3/5] fix linting --- CHANGELOG.md | 1 + pkg/util/trace_info.go | 1 + 2 files changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a695ae53152..48be78a4f60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ * [ENHANCEMENT] Add caching to query range queries [#3796](https://github.com/grafana/tempo/pull/3796) (@mapno) * [ENHANCEMENT] Add data quality metric to measure traces without a root [#3812](https://github.com/grafana/tempo/pull/3812) (@mapno) * [ENHANCEMENT] Add a new helper method to allow debugging e2e tests [#3836](https://github.com/grafana/tempo/pull/3836) (@javiermolinar) +* [ENHANCEMENT] Refactor Tempo Vulture to reduce code complexity [#3850](https://github.com/grafana/tempo/pull/3850) (@javiermolinar) * [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer) * [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio) * [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio) diff --git a/pkg/util/trace_info.go b/pkg/util/trace_info.go index 7e82d8817e1..4717c97b444 100644 --- a/pkg/util/trace_info.go +++ b/pkg/util/trace_info.go @@ -56,6 +56,7 @@ func NewTraceInfo(timestamp time.Time, tempoOrgID string) *TraceInfo { tempoOrgID: tempoOrgID, } } + func NewTraceInfoWithMaxLongWrites(timestamp time.Time, maxLongWrites int64, tempoOrgID string) *TraceInfo { r := newRand(timestamp) From ea43b2ff5bf746548dc5c59f6bd8a5fb23ab926d Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Mon, 8 Jul 2024 07:26:08 -0400 Subject: [PATCH 4/5] linting --- cmd/tempo-vulture/main.go | 38 ++++++++++++++++------------------ cmd/tempo-vulture/main_test.go | 29 ++++++++++++++++++++++++-- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/cmd/tempo-vulture/main.go b/cmd/tempo-vulture/main.go index debe96bed11..f2be81a28a8 100644 --- a/cmd/tempo-vulture/main.go +++ b/cmd/tempo-vulture/main.go @@ -99,11 +99,6 @@ func main() { zapcore.DebugLevel, )) - grpcEndpoint, err := getGRPCEndpoint(tempoPushURL) - if err != nil { - panic(err) - } - logger.Info("Tempo Vulture starting") vultureConfig := vultureConfiguration{ @@ -118,7 +113,7 @@ func main() { tempoPushTLS: tempoPushTLS, } - jaegerClient, err := newJaegerGRPCClient(vultureConfig.tempoPushURL, vultureConfig, logger) + jaegerClient, err := newJaegerGRPCClient(vultureConfig, logger) if err != nil { panic(err) } @@ -140,6 +135,19 @@ func main() { log.Fatal(http.ListenAndServe(prometheusListenAddress, nil)) } +func getGRPCEndpoint(endpoint string) (string, error) { + u, err := url.Parse(endpoint) + if err != nil { + return "", err + } + dialAddress := u.Host + + if u.Port() == "" { + dialAddress = fmt.Sprintf("%s:%d", dialAddress, defaultJaegerGRPCEndpoint) + } + return dialAddress, nil +} + func initTickers(tempoWriteBackoffDuration time.Duration, tempoReadBackoffDuration time.Duration, tempoSearchBackoffDuration time.Duration) (tickerWrite *time.Ticker, tickerRead *time.Ticker, tickerSearch *time.Ticker, err error) { if tempoWriteBackoffDuration <= 0 { return nil, nil, nil, errors.New("tempo-write-backoff-duration must be greater than 0") @@ -327,21 +335,12 @@ func selectPastTimestamp(start, stop time.Time, interval, retention time.Duratio return newStart.Round(interval), ts.Round(interval) } -func newJaegerGRPCClient(endpoint string, config vultureConfiguration, logger *zap.Logger) (*jaeger_grpc.Reporter, error) { - // remove scheme and port - u, err := url.Parse(endpoint) +func newJaegerGRPCClient(config vultureConfiguration, logger *zap.Logger) (*jaeger_grpc.Reporter, error) { + endpoint, err := getGRPCEndpoint(config.tempoPushURL) if err != nil { - return "", err - } - dialAddress := u.Host - - if u.Port() == "" { - dialAddress = fmt.Sprintf("%s:%d", dialAddress, defaultJaegerGRPCEndpoint) + return nil, err } - return dialAddress, nil -} -func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { logger.Info("dialing grpc", zap.String("endpoint", endpoint), ) @@ -359,13 +358,12 @@ func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { grpc.WithTransportCredentials(insecure.NewCredentials()), } } - // new jaeger grpc exporter conn, err := grpc.NewClient(endpoint, dialOpts...) if err != nil { return nil, err } - return jaeger_grpc.NewReporter(conn, nil, logger), err + return jaeger_grpc.NewReporter(conn, nil, logger), nil } func generateRandomInt(min, max int64, r *rand.Rand) int64 { diff --git a/cmd/tempo-vulture/main_test.go b/cmd/tempo-vulture/main_test.go index 66e5d06200d..6616f5c47cc 100644 --- a/cmd/tempo-vulture/main_test.go +++ b/cmd/tempo-vulture/main_test.go @@ -555,12 +555,26 @@ func TestDoSearch(t *testing.T) { assert.Equal(t, mockHTTPClient.searchesCount, 2) } +func TestGetGrpcEndpoint(t *testing.T) { + _, err := getGRPCEndpoint("http://%gh&%ij") + require.Error(t, err) + + got, err := getGRPCEndpoint("http://localhost:4000") + require.NoError(t, err) + assert.Equal(t, "localhost:4000", got, "Address endpoint should keep the given port") + + got, err = getGRPCEndpoint("http://localhost") + require.NoError(t, err) + assert.Equal(t, "localhost:14250", got, "Address without a port should be defaulted to 14250") +} + func TestNewJaegerGRPCClient(t *testing.T) { config := vultureConfiguration{ tempoOrgID: "orgID", tempoWriteBackoffDuration: time.Second, + tempoPushURL: "http://localhost", } - client, err := newJaegerGRPCClient("http://localhost", config, zap.NewNop()) + client, err := newJaegerGRPCClient(config, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, client) @@ -569,9 +583,20 @@ func TestNewJaegerGRPCClient(t *testing.T) { tempoOrgID: "orgID", tempoWriteBackoffDuration: time.Second, tempoPushTLS: true, + tempoPushURL: "http://localhost", } - client, err = newJaegerGRPCClient("http://localhost", config, zap.NewNop()) + client, err = newJaegerGRPCClient(config, zap.NewNop()) assert.NoError(t, err) assert.NotNil(t, client) + + config = vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + tempoPushURL: "http://%gh&%ij", + } + client, err = newJaegerGRPCClient(config, zap.NewNop()) + + assert.Error(t, err) + assert.Nil(t, client) } From ce5c00b9ca8f4659cc36f81686ca60c9594795dd Mon Sep 17 00:00:00 2001 From: javiermolinar Date: Thu, 11 Jul 2024 07:44:06 -0400 Subject: [PATCH 5/5] fix race errors during test execution --- cmd/tempo-vulture/main_test.go | 120 ++++++++++++++++++++++++++------- cmd/tempo-vulture/mocks.go | 80 +++++++++++++++++----- 2 files changed, 159 insertions(+), 41 deletions(-) diff --git a/cmd/tempo-vulture/main_test.go b/cmd/tempo-vulture/main_test.go index 6616f5c47cc..b79b91757e3 100644 --- a/cmd/tempo-vulture/main_test.go +++ b/cmd/tempo-vulture/main_test.go @@ -214,16 +214,23 @@ func TestDoWrite(t *testing.T) { time.Sleep(time.Second) ticker.Stop() + assert.Greater(t, len(mockJaegerClient.GetEmittedBatches()), 0) +} - assert.Greater(t, len(mockJaegerClient.batchesEmitted), 0) +func TestDoWriteWithError(t *testing.T) { + mockJaegerClient := MockReporter{err: errors.New("an error")} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() - // assert an error - mockJaegerClient = MockReporter{err: errors.New("an error")} doWrite(&mockJaegerClient, ticker, config.tempoWriteBackoffDuration, config, logger) - time.Sleep(time.Second) ticker.Stop() - - assert.Greater(t, len(mockJaegerClient.batchesEmitted), 0) + assert.Equal(t, len(mockJaegerClient.GetEmittedBatches()), 0) } func TestQueueFutureBatches(t *testing.T) { @@ -240,7 +247,7 @@ func TestQueueFutureBatches(t *testing.T) { queueFutureBatches(&mockJaegerClient, traceInfo, config, logger) time.Sleep(time.Second) - require.Greater(t, len(mockJaegerClient.batchesEmitted), 0) + require.Greater(t, len(mockJaegerClient.GetEmittedBatches()), 0) // Assert an error mockJaegerClient = MockReporter{err: errors.New("an error")} @@ -352,7 +359,7 @@ func doQueryTrace(f traceOps, err error) (traceMetrics, error) { return queryTrace(&mockHTTPClient, traceInfo, logger) } -func TestDoRead(t *testing.T) { +func TestDoReadWhenTickerIsNil(t *testing.T) { seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) traceInfo := util.NewTraceInfo(seed, "test") @@ -365,26 +372,60 @@ func TestDoRead(t *testing.T) { tempoWriteBackoffDuration: time.Second, } - ticker := time.NewTicker(10 * time.Millisecond) logger = zap.NewNop() r := rand.New(rand.NewSource(startTime.Unix())) // Assert ticker is nil doRead(&mockHTTPClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) - assert.Equal(t, 0, mockHTTPClient.requestsCount) + assert.Equal(t, 0, mockHTTPClient.GetRequestsCount()) +} + +func TestDoReadForAnOkRead(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + mockHTTPClient := MockHTTPClient{err: nil, traceResp: trace} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) // Assert an ok read doRead(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) time.Sleep(time.Second) ticker.Stop() - assert.Greater(t, mockHTTPClient.requestsCount, 0) + assert.Greater(t, mockHTTPClient.GetRequestsCount(), 0) +} + +func TestDoReadForAnErroredRead(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) // Assert a read with errors - mockHTTPClient = MockHTTPClient{err: errors.New("an error"), traceResp: trace} + mockHTTPClient := MockHTTPClient{err: errors.New("an error"), traceResp: trace} doRead(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) time.Sleep(time.Second) - ticker.Stop() - assert.Equal(t, 1, mockHTTPClient.requestsCount) + assert.Equal(t, 0, mockHTTPClient.GetRequestsCount()) } func TestSearchTraceql(t *testing.T) { @@ -532,27 +573,60 @@ func TestDoSearch(t *testing.T) { }, } logger = zap.NewNop() - mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} - // Assert when ticker is nil - doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) - assert.Equal(t, mockHTTPClient.searchesCount, 0) - // Assert an ok search doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) ticker.Stop() + assert.Greater(t, mockHTTPClient.GetSearchesCount(), 0) +} + +func TestDoSearchWhenTickerIsNil(t *testing.T) { + startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) - assert.Greater(t, mockHTTPClient.searchesCount, 0) + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + // This is a hack to ensure the trace is "ready" + tempoWriteBackoffDuration: -time.Hour * 10000, + tempoRetentionDuration: time.Second * 10, + } + + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + logger = zap.NewNop() + + mockHTTPClient := MockHTTPClient{err: nil} + doSearch(&mockHTTPClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, mockHTTPClient.GetSearchesCount(), 0) +} + +func TestDoSearchOnSearchError(t *testing.T) { + startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + // This is a hack to ensure the trace is "ready" + tempoWriteBackoffDuration: -time.Hour * 10000, + tempoRetentionDuration: time.Second * 10, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + logger = zap.NewNop() // Assert an errored search - mockHTTPClient = MockHTTPClient{err: errors.New("an error"), searchResponse: searchResponse} + mockHTTPClient := MockHTTPClient{err: errors.New("an error")} doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) - time.Sleep(time.Second) ticker.Stop() - assert.Equal(t, mockHTTPClient.searchesCount, 2) + assert.Equal(t, mockHTTPClient.searchesCount, 0) } func TestGetGrpcEndpoint(t *testing.T) { diff --git a/cmd/tempo-vulture/mocks.go b/cmd/tempo-vulture/mocks.go index ce18b74cd39..dac2fb004c3 100644 --- a/cmd/tempo-vulture/mocks.go +++ b/cmd/tempo-vulture/mocks.go @@ -3,6 +3,7 @@ package main import ( "context" "net/http" + "sync" userconfigurableoverrides "github.com/grafana/tempo/modules/overrides/userconfigurable/client" thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" @@ -14,17 +15,30 @@ import ( type MockReporter struct { err error batchesEmitted []*thrift.Batch + // We need the lock to control concurrent accesses to batchesEmitted + m sync.Mutex } -func (r MockReporter) EmitZipkinBatch(_ context.Context, _ []*zipkincore.Span) error { +func (r *MockReporter) EmitZipkinBatch(_ context.Context, _ []*zipkincore.Span) error { return r.err } func (r *MockReporter) EmitBatch(_ context.Context, b *thrift.Batch) error { - r.batchesEmitted = append(r.batchesEmitted, b) + if r.err == nil { + r.m.Lock() + defer r.m.Unlock() + r.batchesEmitted = append(r.batchesEmitted, b) + } + return r.err } +func (r *MockReporter) GetEmittedBatches() []*thrift.Batch { + r.m.Lock() + defer r.m.Unlock() + return r.batchesEmitted +} + type MockHTTPClient struct { err error resp http.Response @@ -32,86 +46,105 @@ type MockHTTPClient struct { requestsCount int searchResponse []*tempopb.TraceSearchMetadata searchesCount int + // We need the lock to control concurrent accesses to shared variables in the tests + m sync.Mutex } //nolint:all -func (m MockHTTPClient) DeleteOverrides(version string) error { +func (m *MockHTTPClient) DeleteOverrides(version string) error { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) Do(req *http.Request) (*http.Response, error) { +func (m *MockHTTPClient) Do(req *http.Request) (*http.Response, error) { return &m.resp, m.err } //nolint:all -func (m MockHTTPClient) GetOverrides() (*userconfigurableoverrides.Limits, string, error) { +func (m *MockHTTPClient) GetOverrides() (*userconfigurableoverrides.Limits, string, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) { +func (m *MockHTTPClient) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) { +func (m *MockHTTPClient) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) { panic("unimplemented") } //nolint:all func (m *MockHTTPClient) QueryTrace(id string) (*tempopb.Trace, error) { + if m.err != nil { + return nil, m.err + } + m.m.Lock() + defer m.m.Unlock() m.requestsCount++ return m.traceResp, m.err } +func (m *MockHTTPClient) GetRequestsCount() int { + m.m.Lock() + defer m.m.Unlock() + return m.requestsCount +} + //nolint:all -func (m MockHTTPClient) Search(tags string) (*tempopb.SearchResponse, error) { +func (m *MockHTTPClient) Search(tags string) (*tempopb.SearchResponse, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) { +func (m *MockHTTPClient) SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTagValuesV2(key string, query string) (*tempopb.SearchTagValuesV2Response, error) { +func (m *MockHTTPClient) SearchTagValuesV2(key string, query string) (*tempopb.SearchTagValuesV2Response, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) { +func (m *MockHTTPClient) SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTags() (*tempopb.SearchTagsResponse, error) { +func (m *MockHTTPClient) SearchTags() (*tempopb.SearchTagsResponse, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTagsV2() (*tempopb.SearchTagsV2Response, error) { +func (m *MockHTTPClient) SearchTagsV2() (*tempopb.SearchTagsV2Response, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) { +func (m *MockHTTPClient) SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) { +func (m *MockHTTPClient) SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { +func (m *MockHTTPClient) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { panic("unimplemented") } //nolint:all func (m *MockHTTPClient) SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) { + if m.err != nil { + return nil, m.err + } + + m.m.Lock() + defer m.m.Unlock() traceQlSearchResponse := &tempopb.SearchResponse{ Traces: m.searchResponse, } @@ -121,6 +154,11 @@ func (m *MockHTTPClient) SearchTraceQLWithRange(query string, start int64, end i //nolint:all func (m *MockHTTPClient) SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) { + if m.err != nil { + return nil, m.err + } + m.m.Lock() + defer m.m.Unlock() traceQlSearchResponse := &tempopb.SearchResponse{ Traces: m.searchResponse, } @@ -129,12 +167,18 @@ func (m *MockHTTPClient) SearchWithRange(tags string, start int64, end int64) (* return traceQlSearchResponse, m.err } +func (m *MockHTTPClient) GetSearchesCount() int { + m.m.Lock() + defer m.m.Unlock() + return m.searchesCount +} + //nolint:all -func (m MockHTTPClient) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) { +func (m *MockHTTPClient) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) { panic("unimplemented") } //nolint:all -func (m MockHTTPClient) WithTransport(t http.RoundTripper) { +func (m *MockHTTPClient) WithTransport(t http.RoundTripper) { panic("unimplemented") }