Skip to content

Commit

Permalink
Enabling negative offset and @ modifier (thanos-io#4786)
Browse files Browse the repository at this point in the history
* added exp features to engineFactory

Signed-off-by: metonymic-smokey <[email protected]>

* added exp flags to engine opts

Signed-off-by: metonymic-smokey <[email protected]>

* added unit tests for neg offset and @ modifier

Signed-off-by: metonymic-smokey <[email protected]>

* draft: added feature flags to query builder

Signed-off-by: metonymic-smokey <[email protected]>

* fixed func name

Signed-off-by: metonymic-smokey <[email protected]>

* e2e: add --enable-feature to E2E tests

Signed-off-by: Giedrius Statkevičius <[email protected]>

* e2e: convert ts into a function

Convert timestamp into a function which ensures that it is always the
current time when the test retries a instant query.

Signed-off-by: Giedrius Statkevičius <[email protected]>

Co-authored-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
metonymic-smokey and GiedriusS authored Oct 26, 2021
1 parent 47e0d32 commit 02ce1ad
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 30 deletions.
8 changes: 5 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ func runQuery(
NoStepSubqueryIntervalFn: func(int64) int64 {
return defaultEvaluationInterval.Milliseconds()
},
EnableNegativeOffset: enableNegativeOffset,
EnableAtModifier: enableAtModifier,
}
)

Expand All @@ -463,6 +465,7 @@ func runQuery(
endpoints.Close()
})
}

// Run File Service Discovery and update the store set when the files are modified.
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
Expand All @@ -477,9 +480,6 @@ func runQuery(
cancelRun()
})

engineOpts.EnableAtModifier = enableAtModifier
engineOpts.EnableNegativeOffset = enableNegativeOffset

ctxUpdate, cancelUpdate := context.WithCancel(context.Background())
g.Add(func() error {
for {
Expand Down Expand Up @@ -717,6 +717,8 @@ func engineFactory(
ActiveQueryTracker: eo.ActiveQueryTracker,
LookbackDelta: lookbackDelta,
NoStepSubqueryIntervalFn: eo.NoStepSubqueryIntervalFn,
EnableAtModifier: eo.EnableAtModifier,
EnableNegativeOffset: eo.EnableNegativeOffset,
})
}
return func(maxSourceResolutionMillis int64) *promql.Engine {
Expand Down
10 changes: 10 additions & 0 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ type QuerierBuilder struct {
metadataAddresses []string
targetAddresses []string
exemplarAddresses []string
enableFeatures []string

tracingConfig string
}
Expand All @@ -162,6 +163,11 @@ func NewQuerierBuilder(e e2e.Environment, name string, storeAddresses ...string)
}
}

func (q *QuerierBuilder) WithEnabledFeatures(enableFeatures []string) *QuerierBuilder {
q.enableFeatures = enableFeatures
return q
}

func (q *QuerierBuilder) WithImage(image string) *QuerierBuilder {
q.image = image
return q
Expand Down Expand Up @@ -286,6 +292,10 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
args = append(args, "--exemplar="+addr)
}

for _, feature := range q.enableFeatures {
args = append(args, "--enable-feature="+feature)
}

if len(q.fileSDStoreAddresses) > 0 {
queryFileSDDir := filepath.Join(q.sharedDir, "data", "querier", q.name)
container := filepath.Join(ContainerSharedDir, "data", "querier", q.name)
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestQueryFrontend(t *testing.T) {

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand All @@ -77,7 +77,7 @@ func TestQueryFrontend(t *testing.T) {
queryTimes := vals[0]

t.Run("query frontend works for instant query", func(t *testing.T) {
queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, queryFrontend.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) {

// Ensure we can get the result from Querier first so that it
// doesn't need to retry when we send queries to the frontend later.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down
16 changes: 8 additions & 8 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestQuery(t *testing.T) {

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -160,7 +160,7 @@ func TestQuery(t *testing.T) {
})

// With deduplication.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: true,
}, []model.Metric{
{
Expand Down Expand Up @@ -412,7 +412,7 @@ config:
// We should have single TCP connection, since all APIs are against the same server.
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -571,7 +571,7 @@ func mustURLParse(t *testing.T, addr string) *url.URL {
return u
}

func instantQuery(t *testing.T, ctx context.Context, addr, q string, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector {
func instantQuery(t *testing.T, ctx context.Context, addr, q string, ts func() time.Time, opts promclient.QueryOptions, expectedSeriesLen int) model.Vector {
t.Helper()

fmt.Println("queryAndAssert: Waiting for", expectedSeriesLen, "results for query", q)
Expand All @@ -580,7 +580,7 @@ func instantQuery(t *testing.T, ctx context.Context, addr, q string, opts promcl
logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
testutil.Ok(t, runutil.RetryWithLog(logger, 5*time.Second, ctx.Done(), func() error {
res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, mustURLParse(t, "http://"+addr), q, time.Now(), opts)
res, warnings, err := promclient.NewDefaultClient().QueryInstant(ctx, mustURLParse(t, "http://"+addr), q, ts(), opts)
if err != nil {
return err
}
Expand All @@ -599,10 +599,10 @@ func instantQuery(t *testing.T, ctx context.Context, addr, q string, opts promcl
return result
}

func queryAndAssertSeries(t *testing.T, ctx context.Context, addr, q string, opts promclient.QueryOptions, expected []model.Metric) {
func queryAndAssertSeries(t *testing.T, ctx context.Context, addr, q string, ts func() time.Time, opts promclient.QueryOptions, expected []model.Metric) {
t.Helper()

result := instantQuery(t, ctx, addr, q, opts, len(expected))
result := instantQuery(t, ctx, addr, q, ts, opts, len(expected))
for i, exp := range expected {
testutil.Equals(t, exp, result[i].Metric)
}
Expand All @@ -612,7 +612,7 @@ func queryAndAssert(t *testing.T, ctx context.Context, addr, q string, opts prom
t.Helper()

sortResults(expected)
result := instantQuery(t, ctx, addr, q, opts, len(expected))
result := instantQuery(t, ctx, addr, q, time.Now, opts, len(expected))
for _, r := range result {
r.Timestamp = 0 // Does not matter for us.
}
Expand Down
12 changes: 6 additions & 6 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestReceive(t *testing.T) {
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

// We expect the data from each Prometheus instance to be replicated twice across our ingesting instances
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -378,7 +378,7 @@ func TestReceive(t *testing.T) {

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestReceive(t *testing.T) {

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestReceive(t *testing.T) {

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestReceive(t *testing.T) {

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down Expand Up @@ -653,7 +653,7 @@ func TestReceive(t *testing.T) {
t.Cleanup(cancel)

testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics()))
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), queryUpWithoutInstance, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestRule(t *testing.T) {
})

t.Run("query alerts", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "ALERTS", promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "ALERTS", time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down
35 changes: 26 additions & 9 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
// Ensure bucket UI.
ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(s1.Endpoint("http"), "loaded"))

q, err := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).Build()
q, err := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"promql-negative-offset", "promql-at-modifier"}).Build()
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(q))

Expand Down Expand Up @@ -132,8 +132,8 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(0), "thanos_bucket_store_block_load_failures_total"))

t.Run("query works", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"} @ end()",
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
Expand All @@ -158,7 +158,7 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(2), "thanos_bucket_store_series_blocks_queried"))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
time.Now, promclient.QueryOptions{
Deduplicate: true,
},
[]model.Metric{
Expand Down Expand Up @@ -188,7 +188,7 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))

// TODO(bwplotka): Entries are still in LRU cache.
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
Expand Down Expand Up @@ -217,7 +217,7 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(0), "thanos_bucket_store_block_load_failures_total"))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
Expand Down Expand Up @@ -250,7 +250,7 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(0), "thanos_bucket_store_block_load_failures_total"))

queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
Expand All @@ -265,6 +265,23 @@ metafile_content_ttl: 0s`, memcached.InternalEndpoint("memcached"))
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(7+1), "thanos_bucket_store_series_blocks_queried"))
})

t.Run("negative offset should work", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"} offset -4h",
func() time.Time { return time.Now().Add(-4 * time.Hour) }, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
{
"a": "1",
"b": "2",
"ext1": "value1",
"replica": "3",
},
},
)
testutil.Ok(t, s1.WaitSumMetrics(e2e.Equals(7+2), "thanos_bucket_store_series_blocks_queried"))
})

// TODO(khyati) Let's add some case for compaction-meta.json once the PR will be merged: https://github.com/thanos-io/thanos/pull/2136.
}

Expand Down Expand Up @@ -345,7 +362,7 @@ blocks_iter_ttl: 0s`, memcached.InternalEndpoint("memcached"))

t.Run("query with cache miss", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
Expand All @@ -363,7 +380,7 @@ blocks_iter_ttl: 0s`, memcached.InternalEndpoint("memcached"))

t.Run("query with cache hit", func(t *testing.T) {
queryAndAssertSeries(t, ctx, q.Endpoint("http"), "{a=\"1\"}",
promclient.QueryOptions{
time.Now, promclient.QueryOptions{
Deduplicate: false,
},
[]model.Metric{
Expand Down

0 comments on commit 02ce1ad

Please sign in to comment.