diff --git a/CHANGELOG.md b/CHANGELOG.md index 80ad91e00fa..6b7df8d13a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ * [ENHANCEMENT] Add data quality metric to measure traces without a root [#3812](https://github.com/grafana/tempo/pull/3812) (@mapno) * [ENHANCEMENT] Added an example for running Tempo vulture [#3829](https://github.com/grafana/tempo/pull/3829) (@javiermolinar) * [ENHANCEMENT] Add a new helper method to allow debugging e2e tests [#3836](https://github.com/grafana/tempo/pull/3836) (@javiermolinar) +* [ENHANCEMENT] Refactor Tempo Vulture to reduce code complexity [#3850](https://github.com/grafana/tempo/pull/3850) (@javiermolinar) * [ENHANCEMENT] Self document makefile [#3844](https://github.com/grafana/tempo/pull/3844) (@javiermolinar) * [ENHANCEMENT] Added a Tempo CLI command to drop traces by id by rewriting blocks. [#3856](https://github.com/grafana/tempo/pull/3856) (@joe-elliott) * [ENHANCEMENT] Mixin, make recording rule range interval configurable and increase range interval in alert to support scrape interval of 1 minute [#3851](https://github.com/grafana/tempo/pull/3851) (@jmichalek132) diff --git a/cmd/tempo-vulture/main.go b/cmd/tempo-vulture/main.go index 391f007bc00..f2be81a28a8 100644 --- a/cmd/tempo-vulture/main.go +++ b/cmd/tempo-vulture/main.go @@ -62,6 +62,18 @@ const ( defaultJaegerGRPCEndpoint = 14250 ) +type vultureConfiguration struct { + tempoQueryURL string + tempoPushURL string + tempoOrgID string + tempoWriteBackoffDuration time.Duration + tempoLongWriteBackoffDuration time.Duration + tempoReadBackoffDuration time.Duration + tempoSearchBackoffDuration time.Duration + tempoRetentionDuration time.Duration + tempoPushTLS bool +} + func init() { flag.StringVar(&prometheusPath, "prometheus-path", "/metrics", "The path to publish Prometheus metrics to.") flag.StringVar(&prometheusListenAddress, "prometheus-listen-address", ":80", "The address to listen on for Prometheus scrapes.") @@ -87,98 +99,167 @@ func main() { zapcore.DebugLevel, )) - grpcEndpoint, err := getGRPCEndpoint(tempoPushURL) + logger.Info("Tempo Vulture starting") + + vultureConfig := vultureConfiguration{ + tempoQueryURL: tempoQueryURL, + tempoPushURL: tempoPushURL, + tempoOrgID: tempoOrgID, + tempoWriteBackoffDuration: tempoWriteBackoffDuration, + tempoLongWriteBackoffDuration: tempoLongWriteBackoffDuration, + tempoReadBackoffDuration: tempoReadBackoffDuration, + tempoSearchBackoffDuration: tempoSearchBackoffDuration, + tempoRetentionDuration: tempoRetentionDuration, + tempoPushTLS: tempoPushTLS, + } + + jaegerClient, err := newJaegerGRPCClient(vultureConfig, logger) if err != nil { panic(err) } + httpClient := httpclient.New(vultureConfig.tempoQueryURL, vultureConfig.tempoOrgID) - logger.Info("Tempo Vulture starting") + tickerWrite, tickerRead, tickerSearch, err := initTickers(vultureConfig.tempoWriteBackoffDuration, vultureConfig.tempoReadBackoffDuration, vultureConfig.tempoSearchBackoffDuration) + if err != nil { + panic(err) + } + startTime := time.Now() + r := rand.New(rand.NewSource(startTime.Unix())) + interval := vultureConfig.tempoWriteBackoffDuration - actualStartTime := time.Now() - startTime := actualStartTime - tickerWrite := time.NewTicker(tempoWriteBackoffDuration) + doWrite(jaegerClient, tickerWrite, interval, vultureConfig, logger) + doRead(httpClient, tickerRead, startTime, interval, r, vultureConfig, logger) + doSearch(httpClient, tickerSearch, startTime, interval, r, vultureConfig, logger) - r := rand.New(rand.NewSource(actualStartTime.Unix())) + http.Handle(prometheusPath, promhttp.Handler()) + log.Fatal(http.ListenAndServe(prometheusListenAddress, nil)) +} + +func getGRPCEndpoint(endpoint string) (string, error) { + u, err := url.Parse(endpoint) + if err != nil { + return "", err + } + dialAddress := u.Host - var tickerRead *time.Ticker + if u.Port() == "" { + dialAddress = fmt.Sprintf("%s:%d", dialAddress, defaultJaegerGRPCEndpoint) + } + return dialAddress, nil +} + +func initTickers(tempoWriteBackoffDuration time.Duration, tempoReadBackoffDuration time.Duration, tempoSearchBackoffDuration time.Duration) (tickerWrite *time.Ticker, tickerRead *time.Ticker, tickerSearch *time.Ticker, err error) { + if tempoWriteBackoffDuration <= 0 { + return nil, nil, nil, errors.New("tempo-write-backoff-duration must be greater than 0") + } + tickerWrite = time.NewTicker(tempoWriteBackoffDuration) if tempoReadBackoffDuration > 0 { tickerRead = time.NewTicker(tempoReadBackoffDuration) } - - var tickerSearch *time.Ticker if tempoSearchBackoffDuration > 0 { tickerSearch = time.NewTicker(tempoSearchBackoffDuration) } - if tickerRead == nil && tickerSearch == nil { - log.Fatalf("at least one of tempo-search-backoff-duration or tempo-read-backoff-duration must be set") + return nil, nil, nil, errors.New("at least one of tempo-search-backoff-duration or tempo-read-backoff-duration must be set") } + return tickerWrite, tickerRead, tickerSearch, nil +} - interval := tempoWriteBackoffDuration - - ready := func(info *util.TraceInfo, now time.Time) bool { - // Don't attempt to read on the first itteration if we can't reasonably - // expect the write loop to have fired yet. Double the duration here to - // avoid a race. - if info.Timestamp().Before(actualStartTime.Add(2 * tempoWriteBackoffDuration)) { - return false - } - - return info.Ready(now, tempoWriteBackoffDuration, tempoLongWriteBackoffDuration) +// Don't attempt to read on the first iteration if we can't reasonably +// expect the write loop to have fired yet. Double the duration here to +// avoid a race. +func traceIsReady(info *util.TraceInfo, now time.Time, startTime time.Time, writeBackoff time.Duration, longBackoff time.Duration) bool { + if info.Timestamp().Before(startTime.Add(2 * writeBackoff)) { + return false } - // Write - go func() { - client, err := newJaegerGRPCClient(grpcEndpoint) - if err != nil { - panic(err) - } + return info.Ready(now, writeBackoff, longBackoff) +} +func doWrite(jaegerClient util.JaegerClient, tickerWrite *time.Ticker, interval time.Duration, config vultureConfiguration, l *zap.Logger) { + go func() { for now := range tickerWrite.C { timestamp := now.Round(interval) - info := util.NewTraceInfo(timestamp, tempoOrgID) + info := util.NewTraceInfo(timestamp, config.tempoOrgID) - log := logger.With( - zap.String("org_id", tempoOrgID), + logger := l.With( + zap.String("org_id", config.tempoOrgID), zap.Int64("seed", info.Timestamp().Unix()), ) - log.Info("sending trace") + logger.Info("sending trace") - err := info.EmitBatches(client) + err := info.EmitBatches(jaegerClient) if err != nil { metricErrorTotal.Inc() } - queueFutureBatches(client, info) + queueFutureBatches(jaegerClient, info, config, l) + } + }() +} + +func queueFutureBatches(client util.JaegerClient, info *util.TraceInfo, config vultureConfiguration, l *zap.Logger) { + if info.LongWritesRemaining() == 0 { + return + } + + logger := l.With( + zap.String("org_id", config.tempoOrgID), + zap.String("write_trace_id", info.HexID()), + zap.Int64("seed", info.Timestamp().Unix()), + zap.Int64("longWritesRemaining", info.LongWritesRemaining()), + ) + logger.Info("queueing future batches") + + info.Done() + + go func() { + time.Sleep(config.tempoLongWriteBackoffDuration) + + logger := l.With( + zap.String("org_id", config.tempoOrgID), + zap.String("write_trace_id", info.HexID()), + zap.Int64("seed", info.Timestamp().Unix()), + zap.Int64("longWritesRemaining", info.LongWritesRemaining()), + ) + logger.Info("sending trace") + + err := info.EmitBatches(client) + if err != nil { + logger.Error("failed to queue batches", + zap.Error(err), + ) } + + queueFutureBatches(client, info, config, l) }() +} - // Read +func doRead(httpClient httpclient.HTTPClient, tickerRead *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { if tickerRead != nil { go func() { for now := range tickerRead.C { var seed time.Time startTime, seed = selectPastTimestamp(startTime, now, interval, tempoRetentionDuration, r) - log := logger.With( - zap.String("org_id", tempoOrgID), + logger := l.With( + zap.String("org_id", config.tempoOrgID), zap.Int64("seed", seed.Unix()), ) - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) // Don't query for a trace we don't expect to be complete - if !ready(info, now) { + if !traceIsReady(info, now, startTime, + config.tempoWriteBackoffDuration, config.tempoLongWriteBackoffDuration) { continue } - client := httpclient.New(tempoQueryURL, tempoOrgID) - // query the trace - queryMetrics, err := queryTrace(client, info) + queryMetrics, err := queryTrace(httpClient, info, l) if err != nil { metricErrorTotal.Inc() - log.Error("query for metrics failed", + logger.Error("query for metrics failed", zap.Error(err), ) } @@ -186,40 +267,40 @@ func main() { } }() } +} - // Search +func doSearch(httpClient httpclient.HTTPClient, tickerSearch *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) { if tickerSearch != nil { go func() { for now := range tickerSearch.C { - _, seed := selectPastTimestamp(startTime, now, interval, tempoRetentionDuration, r) - log := logger.With( - zap.String("org_id", tempoOrgID), + _, seed := selectPastTimestamp(startTime, now, interval, config.tempoRetentionDuration, r) + logger := l.With( + zap.String("org_id", config.tempoOrgID), zap.Int64("seed", seed.Unix()), ) - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) - if !ready(info, now) { + if !traceIsReady(info, now, startTime, + config.tempoWriteBackoffDuration, config.tempoLongWriteBackoffDuration) { continue } - client := httpclient.New(tempoQueryURL, tempoOrgID) - // query a tag we expect the trace to be found within - searchMetrics, err := searchTag(client, seed) + searchMetrics, err := searchTag(httpClient, seed, config, l) if err != nil { metricErrorTotal.Inc() - log.Error("search tag for metrics failed", + logger.Error("search tag for metrics failed", zap.Error(err), ) } pushMetrics(searchMetrics) // traceql query - traceqlSearchMetrics, err := searchTraceql(client, seed) + traceqlSearchMetrics, err := searchTraceql(httpClient, seed, config, l) if err != nil { metricErrorTotal.Inc() - log.Error("traceql query for metrics failed", + logger.Error("traceql query for metrics failed", zap.Error(err), ) } @@ -227,46 +308,6 @@ func main() { } }() } - - http.Handle(prometheusPath, promhttp.Handler()) - log.Fatal(http.ListenAndServe(prometheusListenAddress, nil)) -} - -func queueFutureBatches(client *jaeger_grpc.Reporter, info *util.TraceInfo) { - if info.LongWritesRemaining() == 0 { - return - } - - log := logger.With( - zap.String("org_id", tempoOrgID), - zap.String("write_trace_id", info.HexID()), - zap.Int64("seed", info.Timestamp().Unix()), - zap.Int64("longWritesRemaining", info.LongWritesRemaining()), - ) - log.Info("queueing future batches") - - info.Done() - - go func() { - time.Sleep(tempoLongWriteBackoffDuration) - - log := logger.With( - zap.String("org_id", tempoOrgID), - zap.String("write_trace_id", info.HexID()), - zap.Int64("seed", info.Timestamp().Unix()), - zap.Int64("longWritesRemaining", info.LongWritesRemaining()), - ) - log.Info("sending trace") - - err := info.EmitBatches(client) - if err != nil { - log.Error("failed to queue batches", - zap.Error(err), - ) - } - - queueFutureBatches(client, info) - }() } func pushMetrics(metrics traceMetrics) { @@ -294,27 +335,19 @@ func selectPastTimestamp(start, stop time.Time, interval, retention time.Duratio return newStart.Round(interval), ts.Round(interval) } -func getGRPCEndpoint(endpoint string) (string, error) { - u, err := url.Parse(endpoint) +func newJaegerGRPCClient(config vultureConfiguration, logger *zap.Logger) (*jaeger_grpc.Reporter, error) { + endpoint, err := getGRPCEndpoint(config.tempoPushURL) if err != nil { - return "", err - } - dialAddress := u.Host - - if u.Port() == "" { - dialAddress = fmt.Sprintf("%s:%d", dialAddress, defaultJaegerGRPCEndpoint) + return nil, err } - return dialAddress, nil -} -func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { logger.Info("dialing grpc", zap.String("endpoint", endpoint), ) var dialOpts []grpc.DialOption - if tempoPushTLS { + if config.tempoPushTLS { dialOpts = []grpc.DialOption{ grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ InsecureSkipVerify: true, @@ -325,27 +358,48 @@ func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) { grpc.WithTransportCredentials(insecure.NewCredentials()), } } - // new jaeger grpc exporter conn, err := grpc.NewClient(endpoint, dialOpts...) if err != nil { return nil, err } - return jaeger_grpc.NewReporter(conn, nil, logger), err + return jaeger_grpc.NewReporter(conn, nil, logger), nil } func generateRandomInt(min, max int64, r *rand.Rand) int64 { min++ - number := min + r.Int63n(max-min) + var duration int64 + duration = 1 + // This is to prevent a panic when min == max since subtracting them will end in a negative number + if min < max { + duration = max - min + } + number := min + r.Int63n(duration) return number } -func searchTag(client *httpclient.Client, seed time.Time) (traceMetrics, error) { +func traceInTraces(traceID string, traces []*tempopb.TraceSearchMetadata) bool { + for _, t := range traces { + equal, err := util.EqualHexStringTraceIDs(t.TraceID, traceID) + if err != nil { + logger.Error("error comparing trace IDs", zap.Error(err)) + continue + } + + if equal { + return true + } + } + + return false +} + +func searchTag(client httpclient.HTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) hexID := info.HexID() // Get the expected @@ -355,29 +409,13 @@ func searchTag(client *httpclient.Client, seed time.Time) (traceMetrics, error) return traceMetrics{}, err } - traceInTraces := func(traceID string, traces []*tempopb.TraceSearchMetadata) bool { - for _, t := range traces { - equal, err := util.EqualHexStringTraceIDs(t.TraceID, traceID) - if err != nil { - logger.Error("error comparing trace IDs", zap.Error(err)) - continue - } - - if equal { - return true - } - } - - return false - } - attr := util.RandomAttrFromTrace(expected) if attr == nil { tm.notFoundSearchAttribute++ return tm, fmt.Errorf("no search attr selected from trace") } - logger := logger.With( + logger := l.With( zap.Int64("seed", seed.Unix()), zap.String("hexID", hexID), zap.Duration("ago", time.Since(seed)), @@ -405,44 +443,28 @@ func searchTag(client *httpclient.Client, seed time.Time) (traceMetrics, error) return tm, nil } -func searchTraceql(client *httpclient.Client, seed time.Time) (traceMetrics, error) { +func searchTraceql(client httpclient.HTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } - info := util.NewTraceInfo(seed, tempoOrgID) + info := util.NewTraceInfo(seed, config.tempoOrgID) hexID := info.HexID() // Get the expected expected, err := info.ConstructTraceFromEpoch() if err != nil { - logger.Error("unable to construct trace from epoch", zap.Error(err)) + l.Error("unable to construct trace from epoch", zap.Error(err)) return traceMetrics{}, err } - traceInTraces := func(traceID string, traces []*tempopb.TraceSearchMetadata) bool { - for _, t := range traces { - equal, err := util.EqualHexStringTraceIDs(t.TraceID, traceID) - if err != nil { - logger.Error("error comparing trace IDs", zap.Error(err)) - continue - } - - if equal { - return true - } - } - - return false - } - attr := util.RandomAttrFromTrace(expected) if attr == nil { tm.notFoundSearchAttribute++ return tm, fmt.Errorf("no search attr selected from trace") } - logger := logger.With( + logger := l.With( zap.Int64("seed", seed.Unix()), zap.String("hexID", hexID), zap.Duration("ago", time.Since(seed)), @@ -468,14 +490,14 @@ func searchTraceql(client *httpclient.Client, seed time.Time) (traceMetrics, err return tm, nil } -func queryTrace(client *httpclient.Client, info *util.TraceInfo) (traceMetrics, error) { +func queryTrace(client httpclient.HTTPClient, info *util.TraceInfo, l *zap.Logger) (traceMetrics, error) { tm := traceMetrics{ requested: 1, } hexID := info.HexID() - logger := logger.With( + logger := l.With( zap.Int64("seed", info.Timestamp().Unix()), zap.String("hexID", hexID), zap.Duration("ago", time.Since(info.Timestamp())), diff --git a/cmd/tempo-vulture/main_test.go b/cmd/tempo-vulture/main_test.go index 7f3cd58dbdb..b79b91757e3 100644 --- a/cmd/tempo-vulture/main_test.go +++ b/cmd/tempo-vulture/main_test.go @@ -2,7 +2,8 @@ package main import ( "bytes" - "fmt" + "errors" + "math/rand" "os" "testing" "time" @@ -11,8 +12,10 @@ import ( "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "github.com/grafana/tempo/pkg/tempopb" + v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util" ) @@ -88,7 +91,6 @@ func TestResponseFixture(t *testing.T) { marshaller := &jsonpb.Marshaler{} err = marshaller.Marshal(&jsonTrace, generatedTrace) require.NoError(t, err) - fmt.Println(jsonTrace.String()) assert.True(t, equalTraces(expected, generatedTrace)) @@ -112,6 +114,521 @@ func TestEqualTraces(t *testing.T) { require.True(t, equalTraces(a, b)) } +func TestInitTickers(t *testing.T) { + tests := []struct { + name string + writeDuration, readDuration, searchDuration time.Duration + expectedWriteTicker bool + expectedReadTicker bool + expectedSearchTicker bool + expectedError string + }{ + { + name: "Valid write and read durations", + writeDuration: 1 * time.Second, + readDuration: 2 * time.Second, + searchDuration: 0, + expectedWriteTicker: true, + expectedReadTicker: true, + expectedSearchTicker: false, + expectedError: "", + }, + { + name: "Invalid write duration (zero)", + writeDuration: 0, + readDuration: 0, + searchDuration: 0, + expectedWriteTicker: false, + expectedReadTicker: false, + expectedSearchTicker: false, + expectedError: "tempo-write-backoff-duration must be greater than 0", + }, + { + name: "No read durations set", + writeDuration: 1 * time.Second, + readDuration: 0, + searchDuration: 1 * time.Second, + expectedWriteTicker: true, + expectedReadTicker: false, + expectedSearchTicker: true, + expectedError: "", + }, + { + name: "No read or search durations set", + writeDuration: 1 * time.Second, + readDuration: 0, + searchDuration: 0, + expectedWriteTicker: false, + expectedReadTicker: false, + expectedSearchTicker: false, + expectedError: "at least one of tempo-search-backoff-duration or tempo-read-backoff-duration must be set", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tickerWrite, tickerRead, tickerSearch, err := initTickers(tt.writeDuration, tt.readDuration, tt.searchDuration) + + assert.Equal(t, tt.expectedWriteTicker, tickerWrite != nil, "TickerWrite") + assert.Equal(t, tt.expectedReadTicker, tickerRead != nil, "TickerRead") + assert.Equal(t, tt.expectedSearchTicker, tickerSearch != nil, "TickerSearch") + + if tt.expectedError != "" { + assert.NotNil(t, err, "Expected error but got nil") + assert.EqualError(t, err, tt.expectedError, "Error message mismatch") + } else { + assert.Nil(t, err, "Expected no error but got one") + } + }) + } +} + +func TestTraceIsReady(t *testing.T) { + writeBackoff := 1 * time.Second + longWriteBackoff := 5 * time.Second + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + ti := util.NewTraceInfo(seed, "test") + + startTime := time.Date(2009, 1, 1, 12, 0, 0, 0, time.UTC) + ready := traceIsReady(ti, time.Now(), startTime, writeBackoff, longWriteBackoff) + + assert.False(t, ready, "trace should not be ready yet") + + startTime = time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + ready = traceIsReady(ti, seed.Add(2*longWriteBackoff), startTime, writeBackoff, longWriteBackoff) + assert.True(t, ready, "trace should be ready now") +} + +func TestDoWrite(t *testing.T) { + mockJaegerClient := MockReporter{err: nil} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + + doWrite(&mockJaegerClient, ticker, config.tempoWriteBackoffDuration, config, logger) + + time.Sleep(time.Second) + ticker.Stop() + assert.Greater(t, len(mockJaegerClient.GetEmittedBatches()), 0) +} + +func TestDoWriteWithError(t *testing.T) { + mockJaegerClient := MockReporter{err: errors.New("an error")} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + + doWrite(&mockJaegerClient, ticker, config.tempoWriteBackoffDuration, config, logger) + ticker.Stop() + assert.Equal(t, len(mockJaegerClient.GetEmittedBatches()), 0) +} + +func TestQueueFutureBatches(t *testing.T) { + mockJaegerClient := MockReporter{err: nil} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second * 0, + } + + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfoWithMaxLongWrites(seed, 1, "test") + logger = zap.NewNop() + + queueFutureBatches(&mockJaegerClient, traceInfo, config, logger) + time.Sleep(time.Second) + require.Greater(t, len(mockJaegerClient.GetEmittedBatches()), 0) + + // Assert an error + mockJaegerClient = MockReporter{err: errors.New("an error")} + + queueFutureBatches(&mockJaegerClient, traceInfo, config, logger) + time.Sleep(time.Second) + require.Equal(t, len(mockJaegerClient.batchesEmitted), 0) +} + +type traceOps func(*tempopb.Trace) + +func TestQueryTrace(t *testing.T) { + noOp := func(_ *tempopb.Trace) {} + setMissingSpan := func(trace *tempopb.Trace) { + trace.Batches[0].ScopeSpans[0].Spans[0].ParentSpanId = []byte{'t', 'e', 's', 't'} + } + setNoBatchesSpan := func(trace *tempopb.Trace) { + trace.Batches = make([]*v1.ResourceSpans, 0) + } + setAlteredSpan := func(trace *tempopb.Trace) { + trace.Batches[0].ScopeSpans[0].Spans[0].Name = "Different spam" + } + tests := []struct { + name string + traceOperation func(*tempopb.Trace) + err error + expectedMetrics traceMetrics + expectedError error + }{ + { + name: "assert querytrace yields an unexpected error", + traceOperation: noOp, + err: errors.New("unexpected error"), + expectedMetrics: traceMetrics{ + requested: 1, + requestFailed: 1, + }, + expectedError: errors.New("unexpected error"), + }, + { + name: "assert querytrace yields traceNotFound error", + traceOperation: noOp, + err: util.ErrTraceNotFound, + expectedMetrics: traceMetrics{ + requested: 1, + notFoundByID: 1, + }, + expectedError: util.ErrTraceNotFound, + }, + { + name: "assert querytrace for ok trace", + traceOperation: noOp, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + }, + expectedError: nil, + }, + { + name: "assert querytrace for a trace with missing spans", + traceOperation: setMissingSpan, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + missingSpans: 1, + }, + expectedError: nil, + }, + { + name: "assert querytrace for a trace without batches", + traceOperation: setNoBatchesSpan, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + notFoundByID: 1, + }, + expectedError: nil, + }, + { + name: "assert querytrace for a trace different than the ingested one", + traceOperation: setAlteredSpan, + err: nil, + expectedMetrics: traceMetrics{ + requested: 1, + incorrectResult: 1, + }, + expectedError: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metrics, err := doQueryTrace(tt.traceOperation, tt.err) + assert.Equal(t, tt.expectedMetrics, metrics) + assert.Equal(t, tt.expectedError, err) + }) + } +} + +func doQueryTrace(f traceOps, err error) (traceMetrics, error) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + + mockHTTPClient := MockHTTPClient{err: err, traceResp: trace} + logger = zap.NewNop() + f(trace) + return queryTrace(&mockHTTPClient, traceInfo, logger) +} + +func TestDoReadWhenTickerIsNil(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + mockHTTPClient := MockHTTPClient{err: nil, traceResp: trace} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + // Assert ticker is nil + doRead(&mockHTTPClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, 0, mockHTTPClient.GetRequestsCount()) +} + +func TestDoReadForAnOkRead(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + mockHTTPClient := MockHTTPClient{err: nil, traceResp: trace} + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + // Assert an ok read + doRead(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) + ticker.Stop() + assert.Greater(t, mockHTTPClient.GetRequestsCount(), 0) +} + +func TestDoReadForAnErroredRead(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + + trace, _ := traceInfo.ConstructTraceFromEpoch() + startTime := time.Date(2007, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + // Assert a read with errors + mockHTTPClient := MockHTTPClient{err: errors.New("an error"), traceResp: trace} + doRead(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + time.Sleep(time.Second) + assert.Equal(t, 0, mockHTTPClient.GetRequestsCount()) +} + +func TestSearchTraceql(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + info := util.NewTraceInfo(seed, config.tempoOrgID) + hexID := info.HexID() + + searchResponse := []*tempopb.TraceSearchMetadata{ + { + SpanSets: []*tempopb.SpanSet{ + { + Spans: []*tempopb.Span{ + { + SpanID: hexID, + StartTimeUnixNano: 1000000000000, + DurationNanos: 1000000000, + Name: "", + Attributes: []*v1_common.KeyValue{ + {Key: "foo", Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: "Bar"}}}, + }, + }, + }, + }, + }, + }, + } + + mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err := searchTraceql(&mockHTTPClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + notFoundTraceQL: 1, + }, metrics) + + mockHTTPClient = MockHTTPClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err = searchTraceql(&mockHTTPClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + requestFailed: 1, + }, metrics) +} + +func TestSearchTag(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + } + + info := util.NewTraceInfo(seed, config.tempoOrgID) + hexID := info.HexID() + + searchResponse := []*tempopb.TraceSearchMetadata{ + { + SpanSets: []*tempopb.SpanSet{ + { + Spans: []*tempopb.Span{ + { + SpanID: hexID, + StartTimeUnixNano: 1000000000000, + DurationNanos: 1000000000, + Name: "", + Attributes: []*v1_common.KeyValue{ + {Key: "foo", Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: "Bar"}}}, + }, + }, + }, + }, + }, + }, + } + + mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err := searchTag(&mockHTTPClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + notFoundSearch: 1, + }, metrics) + + mockHTTPClient = MockHTTPClient{err: errors.New("something wrong happened"), searchResponse: searchResponse} + logger = zap.NewNop() + + metrics, err = searchTag(&mockHTTPClient, seed, config, logger) + + assert.Error(t, err) + assert.Equal(t, traceMetrics{ + requested: 1, + requestFailed: 1, + }, metrics) +} + +func TestDoSearch(t *testing.T) { + seed := time.Date(2008, 1, 1, 12, 0, 0, 0, time.UTC) + traceInfo := util.NewTraceInfo(seed, "test") + startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + // This is a hack to ensure the trace is "ready" + tempoWriteBackoffDuration: -time.Hour * 10000, + tempoRetentionDuration: time.Second * 10, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + searchResponse := []*tempopb.TraceSearchMetadata{ + { + SpanSets: []*tempopb.SpanSet{ + { + Spans: []*tempopb.Span{ + { + SpanID: traceInfo.HexID(), + StartTimeUnixNano: 1000000000000, + DurationNanos: 1000000000, + Name: "", + Attributes: []*v1_common.KeyValue{ + {Key: "foo", Value: &v1_common.AnyValue{Value: &v1_common.AnyValue_StringValue{StringValue: "Bar"}}}, + }, + }, + }, + }, + }, + }, + } + logger = zap.NewNop() + mockHTTPClient := MockHTTPClient{err: nil, searchResponse: searchResponse} + + doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + + time.Sleep(time.Second) + ticker.Stop() + assert.Greater(t, mockHTTPClient.GetSearchesCount(), 0) +} + +func TestDoSearchWhenTickerIsNil(t *testing.T) { + startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + // This is a hack to ensure the trace is "ready" + tempoWriteBackoffDuration: -time.Hour * 10000, + tempoRetentionDuration: time.Second * 10, + } + + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + logger = zap.NewNop() + + mockHTTPClient := MockHTTPClient{err: nil} + doSearch(&mockHTTPClient, nil, startTime, config.tempoWriteBackoffDuration, r, config, logger) + assert.Equal(t, mockHTTPClient.GetSearchesCount(), 0) +} + +func TestDoSearchOnSearchError(t *testing.T) { + startTime := time.Date(2000, 1, 1, 12, 0, 0, 0, time.UTC) + + // Define the configuration + config := vultureConfiguration{ + tempoOrgID: "orgID", + // This is a hack to ensure the trace is "ready" + tempoWriteBackoffDuration: -time.Hour * 10000, + tempoRetentionDuration: time.Second * 10, + } + + ticker := time.NewTicker(10 * time.Millisecond) + logger = zap.NewNop() + r := rand.New(rand.NewSource(startTime.Unix())) + + logger = zap.NewNop() + + // Assert an errored search + mockHTTPClient := MockHTTPClient{err: errors.New("an error")} + + doSearch(&mockHTTPClient, ticker, startTime, config.tempoWriteBackoffDuration, r, config, logger) + ticker.Stop() + + assert.Equal(t, mockHTTPClient.searchesCount, 0) +} + func TestGetGrpcEndpoint(t *testing.T) { _, err := getGRPCEndpoint("http://%gh&%ij") require.Error(t, err) @@ -124,3 +641,36 @@ func TestGetGrpcEndpoint(t *testing.T) { require.NoError(t, err) assert.Equal(t, "localhost:14250", got, "Address without a port should be defaulted to 14250") } + +func TestNewJaegerGRPCClient(t *testing.T) { + config := vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + tempoPushURL: "http://localhost", + } + client, err := newJaegerGRPCClient(config, zap.NewNop()) + + assert.NoError(t, err) + assert.NotNil(t, client) + + config = vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + tempoPushTLS: true, + tempoPushURL: "http://localhost", + } + client, err = newJaegerGRPCClient(config, zap.NewNop()) + + assert.NoError(t, err) + assert.NotNil(t, client) + + config = vultureConfiguration{ + tempoOrgID: "orgID", + tempoWriteBackoffDuration: time.Second, + tempoPushURL: "http://%gh&%ij", + } + client, err = newJaegerGRPCClient(config, zap.NewNop()) + + assert.Error(t, err) + assert.Nil(t, client) +} diff --git a/cmd/tempo-vulture/mocks.go b/cmd/tempo-vulture/mocks.go new file mode 100644 index 00000000000..dac2fb004c3 --- /dev/null +++ b/cmd/tempo-vulture/mocks.go @@ -0,0 +1,184 @@ +package main + +import ( + "context" + "net/http" + "sync" + + userconfigurableoverrides "github.com/grafana/tempo/modules/overrides/userconfigurable/client" + thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" + + "github.com/grafana/tempo/pkg/tempopb" +) + +type MockReporter struct { + err error + batchesEmitted []*thrift.Batch + // We need the lock to control concurrent accesses to batchesEmitted + m sync.Mutex +} + +func (r *MockReporter) EmitZipkinBatch(_ context.Context, _ []*zipkincore.Span) error { + return r.err +} + +func (r *MockReporter) EmitBatch(_ context.Context, b *thrift.Batch) error { + if r.err == nil { + r.m.Lock() + defer r.m.Unlock() + r.batchesEmitted = append(r.batchesEmitted, b) + } + + return r.err +} + +func (r *MockReporter) GetEmittedBatches() []*thrift.Batch { + r.m.Lock() + defer r.m.Unlock() + return r.batchesEmitted +} + +type MockHTTPClient struct { + err error + resp http.Response + traceResp *tempopb.Trace + requestsCount int + searchResponse []*tempopb.TraceSearchMetadata + searchesCount int + // We need the lock to control concurrent accesses to shared variables in the tests + m sync.Mutex +} + +//nolint:all +func (m *MockHTTPClient) DeleteOverrides(version string) error { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) Do(req *http.Request) (*http.Response, error) { + return &m.resp, m.err +} + +//nolint:all +func (m *MockHTTPClient) GetOverrides() (*userconfigurableoverrides.Limits, string, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) QueryTrace(id string) (*tempopb.Trace, error) { + if m.err != nil { + return nil, m.err + } + m.m.Lock() + defer m.m.Unlock() + m.requestsCount++ + return m.traceResp, m.err +} + +func (m *MockHTTPClient) GetRequestsCount() int { + m.m.Lock() + defer m.m.Unlock() + return m.requestsCount +} + +//nolint:all +func (m *MockHTTPClient) Search(tags string) (*tempopb.SearchResponse, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTagValuesV2(key string, query string) (*tempopb.SearchTagValuesV2Response, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTags() (*tempopb.SearchTagsResponse, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTagsV2() (*tempopb.SearchTagsV2Response, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTraceQL(query string) (*tempopb.SearchResponse, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) { + if m.err != nil { + return nil, m.err + } + + m.m.Lock() + defer m.m.Unlock() + traceQlSearchResponse := &tempopb.SearchResponse{ + Traces: m.searchResponse, + } + m.searchesCount++ + return traceQlSearchResponse, m.err +} + +//nolint:all +func (m *MockHTTPClient) SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) { + if m.err != nil { + return nil, m.err + } + m.m.Lock() + defer m.m.Unlock() + traceQlSearchResponse := &tempopb.SearchResponse{ + Traces: m.searchResponse, + } + + m.searchesCount++ + return traceQlSearchResponse, m.err +} + +func (m *MockHTTPClient) GetSearchesCount() int { + m.m.Lock() + defer m.m.Unlock() + return m.searchesCount +} + +//nolint:all +func (m *MockHTTPClient) SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) { + panic("unimplemented") +} + +//nolint:all +func (m *MockHTTPClient) WithTransport(t http.RoundTripper) { + panic("unimplemented") +} diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 23c10ce9612..456fdb9cfe8 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -33,6 +33,28 @@ const ( applicationJSON = "application/json" ) +type HTTPClient interface { + WithTransport(t http.RoundTripper) + Do(req *http.Request) (*http.Response, error) + SearchTags() (*tempopb.SearchTagsResponse, error) + SearchTagsV2() (*tempopb.SearchTagsV2Response, error) + SearchTagsWithRange(start int64, end int64) (*tempopb.SearchTagsResponse, error) + SearchTagsV2WithRange(start int64, end int64) (*tempopb.SearchTagsV2Response, error) + SearchTagValues(key string) (*tempopb.SearchTagValuesResponse, error) + SearchTagValuesV2(key, query string) (*tempopb.SearchTagValuesV2Response, error) + SearchTagValuesV2WithRange(tag string, start int64, end int64) (*tempopb.SearchTagValuesV2Response, error) + Search(tags string) (*tempopb.SearchResponse, error) + SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error) + QueryTrace(id string) (*tempopb.Trace, error) + SearchTraceQL(query string) (*tempopb.SearchResponse, error) + SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error) + MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error) + GetOverrides() (*userconfigurableoverrides.Limits, string, error) + SetOverrides(limits *userconfigurableoverrides.Limits, version string) (string, error) + PatchOverrides(limits *userconfigurableoverrides.Limits) (*userconfigurableoverrides.Limits, string, error) + DeleteOverrides(version string) error +} + var ErrNotFound = errors.New("resource not found") // Client is client to the Tempo API. diff --git a/pkg/util/trace_info.go b/pkg/util/trace_info.go index 3af8d5c0cd4..4717c97b444 100644 --- a/pkg/util/trace_info.go +++ b/pkg/util/trace_info.go @@ -7,8 +7,8 @@ import ( "time" "github.com/grafana/dskit/user" - jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc" thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + zipkincore "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" jaegerTrans "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" @@ -37,6 +37,12 @@ type TraceInfo struct { tempoOrgID string } +// JaegerClient is an interface used to mock the underlying client in tests. +type JaegerClient interface { + EmitBatch(ctx context.Context, b *thrift.Batch) error + EmitZipkinBatch(ctx context.Context, zSpans []*zipkincore.Span) error +} + // NewTraceInfo is used to produce a new TraceInfo. func NewTraceInfo(timestamp time.Time, tempoOrgID string) *TraceInfo { r := newRand(timestamp) @@ -51,6 +57,19 @@ func NewTraceInfo(timestamp time.Time, tempoOrgID string) *TraceInfo { } } +func NewTraceInfoWithMaxLongWrites(timestamp time.Time, maxLongWrites int64, tempoOrgID string) *TraceInfo { + r := newRand(timestamp) + + return &TraceInfo{ + timestamp: timestamp, + r: r, + traceIDHigh: r.Int63(), + traceIDLow: r.Int63(), + longWritesRemaining: maxLongWrites, + tempoOrgID: tempoOrgID, + } +} + func (t *TraceInfo) Ready(now time.Time, writeBackoff, longWriteBackoff time.Duration) bool { // Don't use the last time interval to allow the write loop to finish before // we try to read it. @@ -58,7 +77,7 @@ func (t *TraceInfo) Ready(now time.Time, writeBackoff, longWriteBackoff time.Dur return false } - // Compare a new instance with the same timstamp to know how many longWritesRemaining. + // Compare a new instance with the same timestamp to know how many longWritesRemaining. totalWrites := NewTraceInfo(t.timestamp, t.tempoOrgID).longWritesRemaining // We are not ready if not all writes have had a chance to send. lastWrite := t.timestamp.Add(time.Duration(totalWrites) * longWriteBackoff) @@ -85,7 +104,7 @@ func (t *TraceInfo) Done() { t.longWritesRemaining-- } -func (t *TraceInfo) EmitBatches(c *jaeger_grpc.Reporter) error { +func (t *TraceInfo) EmitBatches(c JaegerClient) error { for i := int64(0); i < t.generateRandomInt(1, maxBatchesPerWrite); i++ { ctx := user.InjectOrgID(context.Background(), t.tempoOrgID) ctx, err := user.InjectIntoGRPCRequest(ctx) @@ -104,7 +123,7 @@ func (t *TraceInfo) EmitBatches(c *jaeger_grpc.Reporter) error { // EmitAllBatches sends all the batches that would normally be sent at some // interval when using EmitBatches. -func (t *TraceInfo) EmitAllBatches(c *jaeger_grpc.Reporter) error { +func (t *TraceInfo) EmitAllBatches(c JaegerClient) error { err := t.EmitBatches(c) if err != nil { return err