Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tag name filtering #3822

Merged
merged 24 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] TraceQL support for event scope and event:name intrinsic [#3708](https://github.com/grafana/tempo/pull/3708) (@stoewer)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio)
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ Parameters:
- `scope = (resource|span|intrinsic)`
Specifies the scope of the tags, this is an optional parameter, if not specified it means all scopes.
Default = `all`
- `q = (traceql query)`
Optional. A TraceQL query to filter tag names by. Currently only works for a single spanset of `&&`ed conditions. For example: `{ span.foo = "bar" && resource.baz = "bat" ...}`. See also [Filtered tag values](#filtered-tag-values).
- `start = (unix epoch seconds)`
Optional. Along with `end` define a time range from which tags should be returned.
- `end = (unix epoch seconds)`
Expand Down
339 changes: 337 additions & 2 deletions integration/e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"net/http"
"net/url"
"slices"
"sort"
"strconv"
"testing"
Expand All @@ -24,6 +25,249 @@ const (
resourceX = "resource.xx"
)

func TestSearchTagsV2(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneLocal, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, jaegerClient)

type batchTmpl struct {
spanCount int
name string
resourceAttVal, spanAttVal string
resourceAttr, SpanAttr string
}

firstBatch := batchTmpl{spanCount: 2, name: "foo", resourceAttVal: "bar", spanAttVal: "bar", resourceAttr: "firstRes", SpanAttr: "firstSpan"}
secondBatch := batchTmpl{spanCount: 2, name: "baz", resourceAttVal: "qux", spanAttVal: "qux", resourceAttr: "secondRes", SpanAttr: "secondSpan"}

batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, firstBatch.resourceAttr, firstBatch.SpanAttr)
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, secondBatch.resourceAttr, secondBatch.SpanAttr)
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

// Wait for the traces to be written to the WAL
time.Sleep(time.Second * 3)

testCases := []struct {
name string
query string
scope string
expected tempopb.SearchTagsV2Response
}{
{
name: "no filtering",
query: "",
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "first batch - resource",
query: fmt.Sprintf(`{ name="%s" }`, firstBatch.name),
scope: "resource",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "second batch with incomplete query - span",
query: fmt.Sprintf(`{ name="%s" && span.x = }`, secondBatch.name),
scope: "span",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{secondBatch.SpanAttr},
},
},
},
},
{
name: "first batch - resource att - span",
query: fmt.Sprintf(`{ resource.%s="%s" }`, firstBatch.resourceAttr, firstBatch.resourceAttVal),
scope: "span",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr},
},
},
},
},
{
name: "first batch - resource att - resource",
query: fmt.Sprintf(`{ resource.%s="%s" }`, firstBatch.resourceAttr, firstBatch.resourceAttVal),
scope: "resource",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "second batch - resource attribute - span",
query: fmt.Sprintf(`{ resource.%s="%s" }`, secondBatch.resourceAttr, secondBatch.resourceAttVal),
scope: "span",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{secondBatch.SpanAttr},
},
},
},
},
{
name: "too restrictive query",
query: fmt.Sprintf(`{ resource.%s="%s" && resource.y="%s" }`, firstBatch.resourceAttr, firstBatch.resourceAttVal, secondBatch.resourceAttVal),
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "resource",
Tags: []string{"service.name"}, // well known column so included
},
},
},
},
// Unscoped not supported, unfiltered results.
{
name: "unscoped span attribute",
query: fmt.Sprintf(`{ .x="%s" }`, firstBatch.spanAttVal),
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "unscoped res attribute",
query: fmt.Sprintf(`{ .xx="%s" }`, firstBatch.resourceAttVal),
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "both batches - name and resource attribute",
query: `{ resource.service.name="my-service"}`,
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
{
name: "bad query - unfiltered results",
query: fmt.Sprintf("%s = bar", spanX), // bad query, missing quotes
scope: "none",
expected: tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: "span",
Tags: []string{firstBatch.SpanAttr, secondBatch.SpanAttr},
},
{
Name: "resource",
Tags: []string{firstBatch.resourceAttr, secondBatch.resourceAttr, "service.name"},
},
},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
callSearchTagsV2AndAssert(t, tempo, tc.scope, tc.query, tc.expected, 0, 0)
})
}

// Wait to block flushed to backend, 20 seconds is the complete_block_timeout configuration on all in one, we add
// 2s for security.
callFlush(t, tempo)
time.Sleep(time.Second * 22)
callFlush(t, tempo)

// test metrics
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_flushed_total"))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetrics(e2e.Equals(1), "tempo_ingester_blocks_cleared_total"))

// Assert no more on the ingester
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
callSearchTagsV2AndAssert(t, tempo, tc.scope, tc.query, tempopb.SearchTagsV2Response{}, 0, 0)
})
}

// Wait to blocklist_poll to be completed
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"tempodb_blocklist_length"}, e2e.WaitMissingMetrics))

// Assert tags on storage backend
now := time.Now()
start := now.Add(-2 * time.Hour)
end := now.Add(2 * time.Hour)

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
callSearchTagsV2AndAssert(t, tempo, tc.scope, tc.query, tc.expected, start.Unix(), end.Unix())
})
}
}

func TestSearchTagValuesV2(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
Expand All @@ -46,10 +290,10 @@ func TestSearchTagValuesV2(t *testing.T) {
firstBatch := batchTmpl{spanCount: 2, name: "foo", resourceAttVal: "bar", spanAttVal: "bar"}
secondBatch := batchTmpl{spanCount: 2, name: "baz", resourceAttVal: "qux", spanAttVal: "qux"}

batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal)
batch := makeThriftBatchWithSpanCountAttributeAndName(firstBatch.spanCount, firstBatch.name, firstBatch.resourceAttVal, firstBatch.spanAttVal, "xx", "x")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal)
batch = makeThriftBatchWithSpanCountAttributeAndName(secondBatch.spanCount, secondBatch.name, secondBatch.resourceAttVal, secondBatch.spanAttVal, "xx", "x")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

// Wait for the traces to be written to the WAL
Expand Down Expand Up @@ -345,6 +589,97 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName,
require.Equal(t, expected, actualGrpcResp)
}

func callSearchTagsV2AndAssert(t *testing.T, svc *e2e.HTTPService, scope, query string, expected tempopb.SearchTagsV2Response, start, end int64) {
urlPath := fmt.Sprintf(`/api/v2/search/tags?scope=%s&q=%s`, scope, url.QueryEscape(query))

// expected will not have the intrinsic scope since it's the same every time, add it here.
if scope == "none" || scope == "" || scope == "intrinsic" {
expected.Scopes = append(expected.Scopes, &tempopb.SearchTagsV2Scope{
Name: "intrinsic",
Tags: []string{"duration", "event:name", "kind", "name", "rootName", "rootServiceName", "span:duration", "span:kind", "span:name", "span:status", "span:statusMessage", "status", "statusMessage", "trace:duration", "trace:rootName", "trace:rootService", "traceDuration"},
})
}
sort.Slice(expected.Scopes, func(i, j int) bool { return expected.Scopes[i].Name < expected.Scopes[j].Name })
for _, scope := range expected.Scopes {
slices.Sort(scope.Tags)
}

// search for tag values
req, err := http.NewRequest(http.MethodGet, "http://"+svc.Endpoint(3200)+urlPath, nil)
require.NoError(t, err)

q := req.URL.Query()

if start != 0 {
q.Set("start", strconv.Itoa(int(start)))
}

if end != 0 {
q.Set("end", strconv.Itoa(int(end)))
}

req.URL.RawQuery = q.Encode()

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// read body and print it
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
defer res.Body.Close()

// parse response
var response tempopb.SearchTagsV2Response
require.NoError(t, json.Unmarshal(body, &response))

prepTagsResponse(&response)
require.Equal(t, expected, response)

// streaming
grpcReq := &tempopb.SearchTagsRequest{
Scope: scope,
Query: query,
Start: uint32(start),
End: uint32(end),
}

grpcClient, err := util.NewSearchGRPCClient(context.Background(), svc.Endpoint(3200))
require.NoError(t, err)

respTagsValuesV2, err := grpcClient.SearchTagsV2(context.Background(), grpcReq)
require.NoError(t, err)
var grpcResp *tempopb.SearchTagsV2Response
for {
resp, err := respTagsValuesV2.Recv()
if resp != nil {
grpcResp = resp
}
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NotNil(t, grpcResp)

prepTagsResponse(&response)
require.Equal(t, expected, response)
}

func prepTagsResponse(resp *tempopb.SearchTagsV2Response) {
if len(resp.Scopes) == 0 {
resp.Scopes = nil
}
sort.Slice(resp.Scopes, func(i, j int) bool { return resp.Scopes[i].Name < resp.Scopes[j].Name })
for _, scope := range resp.Scopes {
if len(scope.Tags) == 0 {
scope.Tags = nil
}

slices.Sort(scope.Tags)
}
}

func callSearchTagsAndAssert(t *testing.T, svc *e2e.HTTPService, expected searchTagsResponse, start, end int64) {
urlPath := "/api/search/tags"
// search for tag values
Expand Down
Loading
Loading