From baf7756aad1fbd1eb065ebf33f6c6c4bc2c03036 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Mon, 14 Aug 2023 14:33:49 +0200 Subject: [PATCH] store: fix missing flush when handling pushed down queries In the case that we have pushed down queries and internal labels that are overriden by external labels we are not flushing the sorted response. Signed-off-by: Michael Hoffmann --- pkg/store/prometheus.go | 4 +-- pkg/store/prometheus_test.go | 40 +++++++++++++++++++++++++++--- pkg/testutil/e2eutil/prometheus.go | 10 ++++++-- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 18e90ccc21..244ae5592d 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -258,7 +258,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto } func (p *PrometheusStore) queryPrometheus( - s storepb.Store_SeriesServer, + s flushableServer, r *storepb.SeriesRequest, extLsetToRemove map[string]struct{}, ) error { @@ -325,7 +325,7 @@ func (p *PrometheusStore) queryPrometheus( } } - return nil + return s.Flush() } func (p *PrometheusStore) handleSampledPrometheusResponse( diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index e0d6052b65..44fb4e2c20 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -49,12 +49,14 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 + // region is an external label; by adding it as an internal label too we also trigger + // the resorting code paths a := p.Appender() - _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+100, 1) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+100, 1) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+200, 2) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+200, 2) testutil.Ok(t, err) - _, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+300, 3) + _, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+300, 3) testutil.Ok(t, err) testutil.Ok(t, a.Commit()) @@ -154,6 +156,38 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Equals(t, []string(nil), srv.Warnings) testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error()) } + // Querying with pushdown. + { + srv := newStoreSeriesServer(ctx) + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT + 101, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, + }, + QueryHints: &storepb.QueryHints{Func: &storepb.Func{Name: "min_over_time"}, Range: &storepb.Range{Millis: 300}}, + }, srv)) + + testutil.Equals(t, 1, len(srv.SeriesSet)) + + testutil.Equals(t, []labelpb.ZLabel{ + {Name: "a", Value: "b"}, + {Name: "region", Value: "eu-west"}, + {Name: "__thanos_pushed_down", Value: "true"}, + }, srv.SeriesSet[0].Labels) + testutil.Equals(t, []string(nil), srv.Warnings) + testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) + + c := srv.SeriesSet[0].Chunks[0] + testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) + + chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) + testutil.Ok(t, err) + + samples := expandChunk(chk.Iterator(nil)) + testutil.Equals(t, []sample{{baseT + 300, 1}}, samples) + + } } type sample struct { diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index f7c0ad9813..bf7e900a9b 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "io" "math" "math/rand" "net/http" @@ -157,11 +158,16 @@ func newPrometheus(binPath, prefix string) (*Prometheus, error) { return nil, err } - // Just touch an empty config file. We don't need to actually scrape anything. - _, err = os.Create(filepath.Join(db.Dir(), "prometheus.yml")) + f, err := os.Create(filepath.Join(db.Dir(), "prometheus.yml")) if err != nil { return nil, err } + defer f.Close() + + // Some well-known external labels so that we can test label resorting + if _, err = io.WriteString(f, "global:\n external_labels:\n region: eu-west"); err != nil { + return nil, err + } return &Prometheus{ dir: db.Dir(),