Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make detected fields work for both json and proto #12682

Merged
merged 13 commits into from
Apr 19, 2024
10 changes: 5 additions & 5 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *Client) PushOTLPLogLine(line string, timestamp time.Time, logAttributes
return c.pushOTLPLogLine(line, timestamp, logAttributes)
}

func formatTS(ts time.Time) string {
func FormatTS(ts time.Time) string {
return strconv.FormatInt(ts.UnixNano(), 10)
}

Expand All @@ -130,7 +130,7 @@ func (c *Client) pushLogLine(line string, timestamp time.Time, structuredMetadat
},
Values: [][]any{
{
formatTS(timestamp),
FormatTS(timestamp),
line,
structuredMetadata,
},
Expand Down Expand Up @@ -509,7 +509,7 @@ func (c *Client) RunQuery(ctx context.Context, query string, extraHeaders ...Hea

v := url.Values{}
v.Set("query", query)
v.Set("time", formatTS(c.Now.Add(time.Second)))
v.Set("time", FormatTS(c.Now.Add(time.Second)))

u, err := url.Parse(c.baseURL)
if err != nil {
Expand Down Expand Up @@ -568,8 +568,8 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
func (c *Client) rangeQueryURL(query string, start, end time.Time) string {
v := url.Values{}
v.Set("query", query)
v.Set("start", formatTS(start))
v.Set("end", formatTS(end))
v.Set("start", FormatTS(start))
v.Set("end", FormatTS(end))

u, err := url.Parse(c.baseURL)
if err != nil {
Expand Down
223 changes: 223 additions & 0 deletions integration/explore_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
//go:build integration

package integration

import (
"context"
"encoding/json"
"io"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/integration/client"
"github.com/grafana/loki/v3/integration/cluster"
)

type DetectedField struct {
Label string `json:"label"`
Type string `json:"type"`
Cardinality uint64 `json:"cardinality"`
}

type DetectedFields []DetectedField
type DetectedFieldResponse struct {
Fields DetectedFields `json:"fields"`
}

func Test_ExploreLogsApis(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()

// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-compactor.compaction-interval=1s",
"-compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())

// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

// the run querier.
var (
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
"-querier.shard-aggregations=quantile_over_time",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now

t.Run("/detected_fields", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=blue", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))

require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now.Add(-5*time.Second), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=purple", now.Add(-5*time.Second), nil, map[string]string{"job": "fake"}))

require.NoError(t, cliDistributor.PushLogLine("foo=bar color=green", now, nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now, nil, map[string]string{"job": "fake"}))

// validate logs are there
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"foo=bar color=red", "foo=bar color=blue", "foo=bar color=red", "foo=bar color=purple", "foo=bar color=green", "foo=bar color=red"}, lines)

t.Run("non-split queries", func(t *testing.T) {
start := cliQueryFrontend.Now.Add(-1 * time.Minute)
end := cliQueryFrontend.Now.Add(time.Minute)

v := url.Values{}
v.Set("query", `{job="fake"}`)
v.Set("start", client.FormatTS(start))
v.Set("end", client.FormatTS(end))

u := url.URL{}
u.Path = "/loki/api/v1/detected_fields"
u.RawQuery = v.Encode()
dfResp, err := cliQueryFrontend.Get(u.String())
require.NoError(t, err)
defer dfResp.Body.Close()

buf, err := io.ReadAll(dfResp.Body)
require.NoError(t, err)

var detectedFieldResponse DetectedFieldResponse
err = json.Unmarshal(buf, &detectedFieldResponse)
require.NoError(t, err)

require.Equal(t, 2, len(detectedFieldResponse.Fields))

var fooField, colorField DetectedField
for _, field := range detectedFieldResponse.Fields {
if field.Label == "foo" {
fooField = field
}

if field.Label == "color" {
colorField = field
}
}

require.Equal(t, "string", fooField.Type)
require.Equal(t, "string", colorField.Type)
require.Equal(t, uint64(1), fooField.Cardinality)
require.Equal(t, uint64(3), colorField.Cardinality)
})

t.Run("split queries", func(t *testing.T) {
start := cliQueryFrontend.Now.Add(-24 * time.Hour)
end := cliQueryFrontend.Now.Add(time.Minute)

v := url.Values{}
v.Set("query", `{job="fake"}`)
v.Set("start", client.FormatTS(start))
v.Set("end", client.FormatTS(end))

u := url.URL{}
u.Path = "/loki/api/v1/detected_fields"
u.RawQuery = v.Encode()
dfResp, err := cliQueryFrontend.Get(u.String())
require.NoError(t, err)
defer dfResp.Body.Close()

buf, err := io.ReadAll(dfResp.Body)
require.NoError(t, err)

var detectedFieldResponse DetectedFieldResponse
err = json.Unmarshal(buf, &detectedFieldResponse)
require.NoError(t, err)

require.Equal(t, 2, len(detectedFieldResponse.Fields))

var fooField, colorField DetectedField
for _, field := range detectedFieldResponse.Fields {
if field.Label == "foo" {
fooField = field
}

if field.Label == "color" {
colorField = field
}
}

require.Equal(t, "string", fooField.Type)
require.Equal(t, "string", colorField.Type)
require.Equal(t, uint64(1), fooField.Cardinality)
require.Equal(t, uint64(4), colorField.Cardinality)
})
})
}
Loading
Loading