Skip to content

Commit

Permalink
Extend e2e to test all kv stores
Browse files Browse the repository at this point in the history
  • Loading branch information
mapno committed Feb 2, 2022
1 parent a1524ed commit d021495
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ distributor:
ingester:
lifecycler:
ring:
kvstore:
store: memberlist
kvstore: {{ .KVStore }}
replication_factor: 3
heartbeat_period: 100ms
trace_idle_period: 1s
Expand Down
251 changes: 160 additions & 91 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package e2e

import (
"bytes"
"fmt"
"math/rand"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"testing"
"text/template"
"time"

"github.com/grafana/e2e"
e2e_db "github.com/grafana/e2e/db"
e2edb "github.com/grafana/e2e/db"
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -28,7 +32,7 @@ import (
)

const (
configMicroservices = "config-microservices.yaml"
configMicroservices = "config-microservices.tmpl.yaml"
configServerless = "config-serverless.yaml"
configHA = "config-scalable-single-binary.yaml"

Expand Down Expand Up @@ -131,130 +135,195 @@ func TestAllInOne(t *testing.T) {
}
}

func TestMicroservices(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

minio := e2e_db.NewMinio(9000, "tempo")
require.NotNil(t, minio)
require.NoError(t, s.StartAndWaitReady(minio))

require.NoError(t, util.CopyFileToSharedDir(s, configMicroservices, "config.yaml"))
tempoIngester1 := util.NewTempoIngester(1)
tempoIngester2 := util.NewTempoIngester(2)
tempoIngester3 := util.NewTempoIngester(3)

tempoDistributor := util.NewTempoDistributor()
tempoQueryFrontend := util.NewTempoQueryFrontend()
tempoQuerier := util.NewTempoQuerier()
require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoIngester3, tempoDistributor, tempoQueryFrontend, tempoQuerier))

// wait for active ingesters
time.Sleep(1 * time.Second)
matchers := []*labels.Matcher{
func TestMicroservicesWithKVStores(t *testing.T) {
testKVStores := []struct {
name string
kvconfig func(port int) string
}{
{
Type: labels.MatchEqual,
Name: "name",
Value: "ingester",
name: "memberlist",
kvconfig: func(int) string {
return `
store: memberlist`
},
},
{
Type: labels.MatchEqual,
Name: "state",
Value: "ACTIVE",
name: "etcd",
kvconfig: func(port int) string {
return fmt.Sprintf(`
store: etcd
etcd:
endpoints:
- http://tempo_e2e-etcd:%d`, port)
},
},
{
name: "consul",
kvconfig: func(port int) string {
return fmt.Sprintf(`
store: consul
consul:
host: http://tempo_e2e-consul:%d`, port)
},
},
}
require.NoError(t, tempoDistributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`cortex_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics))

// Get port for the Jaeger gRPC receiver endpoint
c, err := newJaegerGRPCClient(tempoDistributor.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)
for _, tc := range testKVStores {
t.Run(tc.name, func(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))
// Set up KVStore
var kvstore *e2e.HTTPService
switch tc.name {
case "etcd":
kvstore = e2edb.NewETCD()
require.NoError(t, s.StartAndWaitReady(kvstore))
case "consul":
kvstore = e2edb.NewConsul()
require.NoError(t, s.StartAndWaitReady(kvstore))
case "memeberlist":
default:
t.Errorf("unknown KVStore %s", tc.name)
}

expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)
tmpl, err := template.New(filepath.Base(configMicroservices)).ParseFiles(configMicroservices)
require.NoError(t, err)

// test metrics
require.NoError(t, tempoDistributor.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))
var KVStoreConfig string
if kvstore != nil {
KVStoreConfig = tc.kvconfig(kvstore.HTTPPort())
}

// test echo
assertEcho(t, "http://"+tempoQueryFrontend.Endpoint(3200)+"/api/echo")
var buf bytes.Buffer
kvconfig := map[string]interface{}{
"KVStore": KVStoreConfig,
}
require.NoError(t, tmpl.Execute(&buf, kvconfig))

// ensure trace is created in ingester (trace_idle_time has passed)
require.NoError(t, tempoIngester1.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_traces_created_total"))
require.NoError(t, tempoIngester2.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_traces_created_total"))
require.NoError(t, tempoIngester3.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_traces_created_total"))
require.NoError(t, util.WriteFileToSharedDir(s, "config.yaml", buf.Bytes()))

apiClient := tempoUtil.NewClient("http://"+tempoQueryFrontend.Endpoint(3200), "")
minio := e2edb.NewMinio(9000, "tempo")
require.NotNil(t, minio)
require.NoError(t, s.StartAndWaitReady(minio))

// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)
tempoIngester1 := util.NewTempoIngester(1)
tempoIngester2 := util.NewTempoIngester(2)
tempoIngester3 := util.NewTempoIngester(3)

// search an in-memory trace
searchAndAssertTrace(t, apiClient, info)
tempoDistributor := util.NewTempoDistributor()
tempoQueryFrontend := util.NewTempoQueryFrontend()
tempoQuerier := util.NewTempoQuerier()
require.NoError(t, s.StartAndWaitReady(tempoIngester1, tempoIngester2, tempoIngester3, tempoDistributor, tempoQueryFrontend, tempoQuerier))

// flush trace to backend
res, err := e2e.DoGet("http://" + tempoIngester1.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)
// wait for active ingesters
time.Sleep(1 * time.Second)
matchers := []*labels.Matcher{
{
Type: labels.MatchEqual,
Name: "name",
Value: "ingester",
},
{
Type: labels.MatchEqual,
Name: "state",
Value: "ACTIVE",
},
}
require.NoError(t, tempoDistributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{`cortex_ring_members`}, e2e.WithLabelMatchers(matchers...), e2e.WaitMissingMetrics))

res, err = e2e.DoGet("http://" + tempoIngester2.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)
// Get port for the Jaeger gRPC receiver endpoint
c, err := newJaegerGRPCClient(tempoDistributor.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, c)

res, err = e2e.DoGet("http://" + tempoIngester3.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)
info := tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

expected, err := info.ConstructTraceFromEpoch()
require.NoError(t, err)

// sleep for one maintenance cycle
time.Sleep(5 * time.Second)
// test metrics
require.NoError(t, tempoDistributor.WaitSumMetrics(e2e.Equals(spanCount(expected)), "tempo_distributor_spans_received_total"))

// test metrics
for _, i := range []*e2e.HTTPService{tempoIngester1, tempoIngester2, tempoIngester3} {
require.NoError(t, i.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
}
require.NoError(t, tempoQuerier.WaitSumMetrics(e2e.Equals(3), "tempodb_blocklist_length"))
require.NoError(t, tempoQueryFrontend.WaitSumMetrics(e2e.Equals(4), "tempo_query_frontend_queries_total"))
// test echo
assertEcho(t, "http://"+tempoQueryFrontend.Endpoint(3200)+"/api/echo")

// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, info)
// ensure trace is created in ingester (trace_idle_time has passed)
require.NoError(t, tempoIngester1.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_traces_created_total"))
require.NoError(t, tempoIngester2.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_traces_created_total"))
require.NoError(t, tempoIngester3.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_traces_created_total"))

// stop an ingester and confirm we can still write and query
err = tempoIngester2.Kill()
require.NoError(t, err)
apiClient := tempoUtil.NewClient("http://"+tempoQueryFrontend.Endpoint(3200), "")

// sleep for heartbeat timeout
time.Sleep(1 * time.Second)
// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)

info = tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))
// search an in-memory trace
searchAndAssertTrace(t, apiClient, info)

// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)
// flush trace to backend
res, err := e2e.DoGet("http://" + tempoIngester1.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)

// search an in-memory trace
searchAndAssertTrace(t, apiClient, info)
res, err = e2e.DoGet("http://" + tempoIngester2.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)

// search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
now := time.Now()
searchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())
res, err = e2e.DoGet("http://" + tempoIngester3.Endpoint(3200) + "/flush")
require.NoError(t, err)
require.Equal(t, 204, res.StatusCode)

// stop another ingester and confirm things fail
err = tempoIngester1.Kill()
require.NoError(t, err)
// sleep for one maintenance cycle
time.Sleep(5 * time.Second)

require.Error(t, info.EmitBatches(c))
// test metrics
for _, i := range []*e2e.HTTPService{tempoIngester1, tempoIngester2, tempoIngester3} {
require.NoError(t, i.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
}
require.NoError(t, tempoQuerier.WaitSumMetrics(e2e.Equals(3), "tempodb_blocklist_length"))
require.NoError(t, tempoQueryFrontend.WaitSumMetrics(e2e.Equals(4), "tempo_query_frontend_queries_total"))

// query trace - should fetch from backend
queryAndAssertTrace(t, apiClient, info)

// stop an ingester and confirm we can still write and query
err = tempoIngester2.Kill()
require.NoError(t, err)

// sleep for heartbeat timeout
time.Sleep(1 * time.Second)

info = tempoUtil.NewTraceInfo(time.Now(), "")
require.NoError(t, info.EmitAllBatches(c))

// query an in-memory trace
queryAndAssertTrace(t, apiClient, info)

// search an in-memory trace
searchAndAssertTrace(t, apiClient, info)

// search the backend. this works b/c we're passing a start/end AND setting query ingesters within min/max to 0
now := time.Now()
searchAndAssertTraceBackend(t, apiClient, info, now.Add(-20*time.Minute).Unix(), now.Unix())

// stop another ingester and confirm things fail
err = tempoIngester1.Kill()
require.NoError(t, err)

require.Error(t, info.EmitBatches(c))
})
}
}

func TestScalableSingleBinary(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

minio := e2e_db.NewMinio(9000, "tempo")
minio := e2edb.NewMinio(9000, "tempo")
require.NotNil(t, minio)
require.NoError(t, s.StartAndWaitReady(minio))

Expand Down

0 comments on commit d021495

Please sign in to comment.