Skip to content

Commit

Permalink
Leverage TraceInfo for e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Sep 29, 2021
1 parent 918085a commit 90b7b90
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 44 deletions.
2 changes: 2 additions & 0 deletions integration/e2e/config-all-in-one-azurite.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
search_enabled: true

target: all

server:
Expand Down
2 changes: 2 additions & 0 deletions integration/e2e/config-all-in-one-gcs.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
search_enabled: true

target: all

server:
Expand Down
2 changes: 2 additions & 0 deletions integration/e2e/config-all-in-one-s3.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
search_enabled: true

target: all

server:
Expand Down
112 changes: 74 additions & 38 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package e2e

import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"reflect"
"testing"
"time"

Expand All @@ -13,7 +12,6 @@ import (
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -22,6 +20,8 @@ import (
"github.com/grafana/tempo/cmd/tempo/app"
util "github.com/grafana/tempo/integration"
"github.com/grafana/tempo/integration/e2e/backend"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
tempoUtil "github.com/grafana/tempo/pkg/util"
)

Expand Down Expand Up @@ -75,24 +75,29 @@ func TestAllInOne(t *testing.T) {
c, err := newJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)
batch := makeThriftBatch()
require.NoError(t, c.EmitBatch(context.Background(), batch))

// test metrics
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_distributor_spans_received_total"))
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

hexID := fmt.Sprintf("%016x%016x", batch.Spans[0].TraceIdHigh, batch.Spans[0].TraceIdLow)
// test metrics
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))

// test echo
assertEcho(t, "http://"+tempo.Endpoint(3200)+"/api/echo")

// ensure trace is created in ingester (trace_idle_time has passed)
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_traces_created_total"))

// query an in-memory trace
apiClient := tempoUtil.NewClient("http://"+tempo.Endpoint(3200), "")
queryAndAssertTrace(t, apiClient, hexID, "my operation", 1)
searchAndAssertTrace(t, apiClient, batch.Spans[0].Tags[0].Key, batch.Spans[0].Tags[0].GetVStr(), hexID)

// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)

// search an in-memory trace
searchAndAssertTrace(t, apiClient, info)

// flush trace to backend
res, err := cortex_e2e.GetRequest("http://" + tempo.Endpoint(3200) + "/flush")
Expand All @@ -110,10 +115,13 @@ func TestAllInOne(t *testing.T) {
// test metrics
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempodb_blocklist_length"))
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_query_frontend_queries_total"))
require.NoError(t, tempo.WaitSumMetrics(cortex_e2e.Equals(2), "tempo_query_frontend_queries_total"))

// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, hexID, "my operation", 1)
queryAndAssertTrace(t, apiClient, info)

// TODO: when search is implemented in the backend, add search
//searchAndAssertTrace(t, apiClient, info)
})
}
}
Expand All @@ -137,8 +145,6 @@ func TestMicroservices(t *testing.T) {
tempoQuerier := util.NewTempoQuerier()
require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoIngester3, tempoDistributor, tempoQueryFrontend, tempoQuerier))

apiClient := tempoUtil.NewClient("http://"+tempoQueryFrontend.Endpoint(3200), "")

// wait for 2 active ingesters
time.Sleep(1 * time.Second)
matchers := []*labels.Matcher{
Expand All @@ -159,14 +165,15 @@ func TestMicroservices(t *testing.T) {
c, err := newJaegerGRPCClient(tempoDistributor.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)
batch := makeThriftBatch()

require.NoError(t, c.EmitBatch(context.Background(), batch))
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

// test metrics
require.NoError(t, tempoDistributor.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_distributor_spans_received_total"))
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

hexID := fmt.Sprintf("%016x%016x", batch.Spans[0].TraceIdHigh, batch.Spans[0].TraceIdLow)
// test metrics
require.NoError(t, tempoDistributor.WaitSumMetrics(cortex_e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))

// test echo
assertEcho(t, "http://"+tempoQueryFrontend.Endpoint(3200)+"/api/echo")
Expand All @@ -176,8 +183,13 @@ func TestMicroservices(t *testing.T) {
require.NoError(t, tempoIngester2.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_traces_created_total"))
require.NoError(t, tempoIngester3.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_traces_created_total"))

apiClient := tempoUtil.NewClient("http://"+tempoQueryFrontend.Endpoint(3200), "")

// query an in-memory trace
queryAndAssertTrace(t, apiClient, hexID, "my operation", 1)
queryAndAssertTrace(t, apiClient, info)

// search an in-memory trace
searchAndAssertTrace(t, apiClient, info)

// flush trace to backend
res, err := cortex_e2e.GetRequest("http://" + tempoIngester1.Endpoint(3200) + "/flush")
Expand All @@ -200,10 +212,10 @@ func TestMicroservices(t *testing.T) {
require.NoError(t, i.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
}
require.NoError(t, tempoQuerier.WaitSumMetrics(cortex_e2e.Equals(3), "tempodb_blocklist_length"))
require.NoError(t, tempoQueryFrontend.WaitSumMetrics(cortex_e2e.Equals(1), "tempo_query_frontend_queries_total"))
require.NoError(t, tempoQueryFrontend.WaitSumMetrics(cortex_e2e.Equals(2), "tempo_query_frontend_queries_total"))

// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, hexID, "my operation", 1)
queryAndAssertTrace(t, apiClient, info)

// stop an ingester and confirm we can still write and query
err = tempoIngester2.Stop()
Expand All @@ -212,22 +224,20 @@ func TestMicroservices(t *testing.T) {
// sleep for heartbeat timeout
time.Sleep(1 * time.Second)

batch = makeThriftBatch()
require.NoError(t, c.EmitBatch(context.Background(), batch))
hexID = fmt.Sprintf("%016x%016x", batch.Spans[0].TraceIdHigh, batch.Spans[0].TraceIdLow)
info = tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

// query an in-memory trace
queryAndAssertTrace(t, apiClient, hexID, "my operation", 1)
queryAndAssertTrace(t, apiClient, info)

// search an in-memory trace
searchAndAssertTrace(t, apiClient, batch.Spans[0].Tags[0].Key, batch.Spans[0].Tags[0].GetVStr(), hexID)
searchAndAssertTrace(t, apiClient, info)

// stop another ingester and confirm things fail
err = tempoIngester1.Stop()
require.NoError(t, err)

batch = makeThriftBatch()
require.Error(t, c.EmitBatch(context.Background(), batch))
require.Error(t, info.EmitBatches(c))
}

func makeThriftBatch() *thrift.Batch {
Expand Down Expand Up @@ -260,6 +270,7 @@ func makeThriftBatchWithSpanCount(n int) *thrift.Batch {
Logs: nil,
})
}

return &thrift.Batch{Spans: spans}
}

Expand All @@ -271,19 +282,26 @@ func assertEcho(t *testing.T, url string) {
}

//nolint:unparam
func queryAndAssertTrace(t *testing.T, client *tempoUtil.Client, hexID string, expectedName string, expectedBatches int) {
resp, err := client.QueryTrace(hexID)
func queryAndAssertTrace(t *testing.T, client *tempoUtil.Client, info *tempoUtil.TraceInfo) {
resp, err := client.QueryTrace(info.HexID())
require.NoError(t, err)

expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

require.Len(t, resp.Batches, expectedBatches)
assert.Equal(t, expectedName, resp.Batches[0].InstrumentationLibrarySpans[0].Spans[0].Name)
require.True(t, equalTraces(resp, expected))
}

func searchAndAssertTrace(t *testing.T, client *tempoUtil.Client, key, value, expectedHex string) {
resp, err := client.SearchTag(key, value)
func searchAndAssertTrace(t *testing.T, client *tempoUtil.Client, info *tempoUtil.TraceInfo) {
expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

attr := tempoUtil.RandomAttrFromTrace(expected)

resp, err := client.SearchTag(attr.GetKey(), attr.GetValue().GetStringValue())
require.NoError(t, err)

hasHex := func(hexId string) bool {
hasHex := func(hexId string, resp *tempopb.SearchResponse) bool {
for _, s := range resp.Traces {
equal, err := tempoUtil.EqualHexStringTraceIDs(s.TraceID, hexId)
require.NoError(t, err)
Expand All @@ -295,7 +313,7 @@ func searchAndAssertTrace(t *testing.T, client *tempoUtil.Client, key, value, ex
return false
}

require.True(t, hasHex(expectedHex))
require.True(t, hasHex(info.HexID(), resp))
}

func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) {
Expand All @@ -310,3 +328,21 @@ func newJaegerGRPCClient(endpoint string) (*jaeger_grpc.Reporter, error) {
}
return jaeger_grpc.NewReporter(conn, nil, logger), err
}

func equalTraces(a, b *tempopb.Trace) bool {
model.SortTrace(a)
model.SortTrace(b)

return reflect.DeepEqual(a, b)
}

func spanCount(a *tempopb.Trace) float64 {
count := 0
for _, batch := range a.Batches {
for _, spans := range batch.InstrumentationLibrarySpans {
count += len(spans.Spans)
}
}

return float64(count)
}
2 changes: 1 addition & 1 deletion pkg/util/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Client) getFor(url string, m proto.Message) (*http.Response, error) {
}()

if resp.StatusCode >= 400 && resp.StatusCode <= 599 {
return resp, fmt.Errorf("request failed with response: %d", resp.StatusCode)
return resp, fmt.Errorf("GET request to %s failed with response: %d", req.URL.String(), resp.StatusCode)
}

unmarshaller := &jsonpb.Unmarshaler{}
Expand Down
12 changes: 7 additions & 5 deletions pkg/util/trace_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ var (
maxLongWritesPerTrace int64 = 3
)

// TraceInfo is some information that is used to keep track of a somewhat
// randomly generated trace.
// TraceInfo is used to construct synthetic traces and manage the expectations.
type TraceInfo struct {
timestamp time.Time
r *rand.Rand
Expand All @@ -36,6 +35,7 @@ type TraceInfo struct {
tempoOrgID string
}

// NewTraceInfo is used to produce a new TraceInfo.
func NewTraceInfo(timestamp time.Time, tempoOrgID string) *TraceInfo {
r := newRand(timestamp)

Expand All @@ -53,7 +53,6 @@ func (t *TraceInfo) Ready(now time.Time, writeBackoff, longWriteBackoff time.Dur

// Don't use the last time interval to allow the write loop to finish before
// we try to read it.

if t.timestamp.After(now.Add(-writeBackoff)) {
return false
}
Expand Down Expand Up @@ -92,6 +91,7 @@ func (t *TraceInfo) EmitBatches(c *jaeger_grpc.Reporter) error {
if err != nil {
return fmt.Errorf("error injecting org id: %w", err)
}

err = c.EmitBatch(ctx, t.makeThriftBatch(t.traceIDHigh, t.traceIDLow))
if err != nil {
return fmt.Errorf("error pushing batch to Tempo: %w", err)
Expand All @@ -101,13 +101,15 @@ func (t *TraceInfo) EmitBatches(c *jaeger_grpc.Reporter) error {
return nil
}

// EmitAllBatches sends all the batches that would normally be sent at some
// interval when using EmitBatches.
func (t *TraceInfo) EmitAllBatches(c *jaeger_grpc.Reporter) error {

err := t.EmitBatches(c)
if err != nil {
return err
}
for t.longWritesRemaining > 0 {

for t.LongWritesRemaining() > 0 {
t.Done()

err := t.EmitBatches(c)
Expand Down

0 comments on commit 90b7b90

Please sign in to comment.