Skip to content

Commit

Permalink
store: fix missing flush when handling pushed down queries (thanos-io…
Browse files Browse the repository at this point in the history
…#6612)

In the case that we have pushed down queries and internal labels that
are overriden by external labels we are not flushing the sorted response.

Signed-off-by: Michael Hoffmann <[email protected]>
Signed-off-by: prabhashj07 <[email protected]>
  • Loading branch information
MichaHoffmann authored and prabhashj07 committed Aug 18, 2023
1 parent 2b4f2a7 commit 246a034
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 7 deletions.
4 changes: 2 additions & 2 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto
}

func (p *PrometheusStore) queryPrometheus(
s storepb.Store_SeriesServer,
s flushableServer,
r *storepb.SeriesRequest,
extLsetToRemove map[string]struct{},
) error {
Expand Down Expand Up @@ -325,7 +325,7 @@ func (p *PrometheusStore) queryPrometheus(
}
}

return nil
return s.Flush()
}

func (p *PrometheusStore) handleSampledPrometheusResponse(
Expand Down
40 changes: 37 additions & 3 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {

baseT := timestamp.FromTime(time.Now()) / 1000 * 1000

// region is an external label; by adding it as an internal label too we also trigger
// the resorting code paths
a := p.Appender()
_, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+100, 1)
_, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+100, 1)
testutil.Ok(t, err)
_, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+200, 2)
_, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+200, 2)
testutil.Ok(t, err)
_, err = a.Append(0, labels.FromStrings("a", "b", "region", "eu-west"), baseT+300, 3)
_, err = a.Append(0, labels.FromStrings("a", "b", "region", "local"), baseT+300, 3)
testutil.Ok(t, err)
testutil.Ok(t, a.Commit())

Expand Down Expand Up @@ -154,6 +156,38 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
testutil.Equals(t, []string(nil), srv.Warnings)
testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error())
}
// Querying with pushdown.
{
srv := newStoreSeriesServer(ctx)
testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
QueryHints: &storepb.QueryHints{Func: &storepb.Func{Name: "min_over_time"}, Range: &storepb.Range{Millis: 300}},
}, srv))

testutil.Equals(t, 1, len(srv.SeriesSet))

testutil.Equals(t, []labelpb.ZLabel{
{Name: "a", Value: "b"},
{Name: "region", Value: "eu-west"},
{Name: "__thanos_pushed_down", Value: "true"},
}, srv.SeriesSet[0].Labels)
testutil.Equals(t, []string(nil), srv.Warnings)
testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks))

c := srv.SeriesSet[0].Chunks[0]
testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type)

chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data)
testutil.Ok(t, err)

samples := expandChunk(chk.Iterator(nil))
testutil.Equals(t, []sample{{baseT + 300, 1}}, samples)

}
}

type sample struct {
Expand Down
10 changes: 8 additions & 2 deletions pkg/testutil/e2eutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"math"
"math/rand"
"net/http"
Expand Down Expand Up @@ -157,11 +158,16 @@ func newPrometheus(binPath, prefix string) (*Prometheus, error) {
return nil, err
}

// Just touch an empty config file. We don't need to actually scrape anything.
_, err = os.Create(filepath.Join(db.Dir(), "prometheus.yml"))
f, err := os.Create(filepath.Join(db.Dir(), "prometheus.yml"))
if err != nil {
return nil, err
}
defer f.Close()

// Some well-known external labels so that we can test label resorting
if _, err = io.WriteString(f, "global:\n external_labels:\n region: eu-west"); err != nil {
return nil, err
}

return &Prometheus{
dir: db.Dir(),
Expand Down

0 comments on commit 246a034

Please sign in to comment.