diff --git a/CHANGELOG.md b/CHANGELOG.md index 3391ad72e6..beee2663cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,10 +27,10 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re - [#3922](https://github.com/thanos-io/thanos/pull/3922) Fix panic in http logging middleware. - [#3960](https://github.com/thanos-io/thanos/pull/3960) fix deduplication of equal alerts with different labels - [#3937](https://github.com/thanos-io/thanos/pull/3937) Store: Fix race condition in chunk pool. +- [#4017](https://github.com/thanos-io/thanos/pull/4017) Query Frontend: fix downsampling iterator returning duplicate samples. ### Changed - [#3929](https://github.com/thanos-io/thanos/pull/3929) Store: Adds the name of the instantiated memcached client to log info - - [#3948](https://github.com/thanos-io/thanos/pull/3948) Receiver: Adjust `http_request_duration_seconds` buckets for low latency requests. - [#3856](https://github.com/thanos-io/thanos/pull/3856) Mixin: _breaking :warning:_ Introduce flexible multi-cluster/namespace mode for alerts and dashboards. Removes jobPrefix config option. Removes `namespace` by default. - [#3937](https://github.com/thanos-io/thanos/pull/3937) Store: Reduce memory usage for range queries. diff --git a/go.sum b/go.sum index 48be869cd1..ffc352be04 100644 --- a/go.sum +++ b/go.sum @@ -1251,19 +1251,15 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da h1:NimzV1aGyq29m5ukMK0AMWEhFaL/lrEOaephfuoiARg= github.com/yuin/gopher-lua v0.0.0-20200816102855-ee81675732da/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA= gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE= -go.elastic.co/apm v1.5.0 h1:arba7i+CVc36Jptww3R1ttW+O10ydvnBtidyd85DLpg= go.elastic.co/apm v1.5.0/go.mod h1:OdB9sPtM6Vt7oz3VXt7+KR96i9li74qrxBGHTQygFvk= go.elastic.co/apm v1.11.0 h1:uJyt6nCW9880sZhfl1tB//Jy/5TadNoAd8edRUtgb3w= go.elastic.co/apm v1.11.0/go.mod h1:qoOSi09pnzJDh5fKnfY7bPmQgl8yl2tULdOu03xhui0= -go.elastic.co/apm/module/apmhttp v1.5.0 h1:sxntP97oENyWWi+6GAwXUo05oEpkwbiarZLqrzLRA4o= go.elastic.co/apm/module/apmhttp v1.5.0/go.mod h1:1FbmNuyD3ddauwzgVwFB0fqY6KbZt3JkV187tGCYYhY= go.elastic.co/apm/module/apmhttp v1.11.0 h1:k/MjK0y2aLOXumoM8jcWXqxvIFlMS4U8Bn9cMUPdVX0= go.elastic.co/apm/module/apmhttp v1.11.0/go.mod h1:5JFMIxdeS4vJy+D1PPPjINuX6hZ3AHalZXoOgyqZAkk= -go.elastic.co/apm/module/apmot v1.5.0 h1:rPyHRI6Ooqjwny67au6e2eIxLZshqd7bJfAUpdgOw/4= go.elastic.co/apm/module/apmot v1.5.0/go.mod h1:d2KYwhJParTpyw2WnTNy8geNlHKKFX+4oK3YLlsesWE= go.elastic.co/apm/module/apmot v1.11.0 h1:Qmol6ztDJgvGK/B2cRdcPRNw4qE7kRv1d0vo9ptZfIo= go.elastic.co/apm/module/apmot v1.11.0/go.mod h1:Qnbt3w1DvUd/5QugAF1AJ3mR4AG86EcJFBnAGW77EmU= -go.elastic.co/fastjson v1.0.0 h1:ooXV/ABvf+tBul26jcVViPT3sBir0PvXgibYB1IQQzg= go.elastic.co/fastjson v1.0.0/go.mod h1:PmeUOMMtLHQr9ZS9J9owrAVg0FkaZDRZJEFTTGHtchs= go.elastic.co/fastjson v1.1.0 h1:3MrGBWWVIxe/xvsbpghtkFoPciPhOCmjsR/HfwEeQR4= go.elastic.co/fastjson v1.1.0/go.mod h1:boNGISWMjQsUPy/t6yqt2/1Wx4YNPSe+mZjlyw9vKKI= diff --git a/pkg/queryfrontend/downsampled.go b/pkg/queryfrontend/downsampled.go index a51cfd92ab..7af80f83a8 100644 --- a/pkg/queryfrontend/downsampled.go +++ b/pkg/queryfrontend/downsampled.go @@ -91,13 +91,19 @@ forLoop: return response, nil } +// minResponseTime returns earliest timestamp in r.Data.Result. +// -1 is returned if r contains no data points. +// Each SampleStream within r.Data.Result must be sorted by timestamp. func minResponseTime(r queryrange.Response) int64 { var res = r.(*queryrange.PrometheusResponse).Data.Result - if len(res) == 0 { + if len(res) == 0 || len(res[0].Samples) == 0 { return -1 } - if len(res[0].Samples) == 0 { - return -1 + var minTs = res[0].Samples[0].TimestampMs + for _, sampleStream := range res[1:] { + if ts := sampleStream.Samples[0].TimestampMs; ts < minTs { + minTs = ts + } } - return res[0].Samples[0].TimestampMs + return minTs } diff --git a/pkg/queryfrontend/downsampled_test.go b/pkg/queryfrontend/downsampled_test.go new file mode 100644 index 0000000000..45dff102d9 --- /dev/null +++ b/pkg/queryfrontend/downsampled_test.go @@ -0,0 +1,124 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "testing" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestDownsampled_MinResponseTime(t *testing.T) { + for _, tc := range []struct { + desc string + sampleStreams []queryrange.SampleStream + expected int64 + }{ + { + desc: "empty []sampleStream", + sampleStreams: []queryrange.SampleStream{}, + expected: -1, + }, + { + desc: "one SampleStream with zero samples", + sampleStreams: []queryrange.SampleStream{ + { + Samples: []cortexpb.Sample{}, + }, + }, + expected: -1, + }, + { + desc: "one SampleStream with one sample at zero time", + sampleStreams: []queryrange.SampleStream{ + { + Samples: []cortexpb.Sample{ + {TimestampMs: 0}, + }, + }, + }, + expected: 0, + }, + { + desc: "one SampleStream with one sample", + sampleStreams: []queryrange.SampleStream{ + { + Samples: []cortexpb.Sample{ + {TimestampMs: 1}, + }, + }, + }, + expected: 1, + }, + { + desc: "two SampleStreams, first is the earliest", + sampleStreams: []queryrange.SampleStream{ + { + Samples: []cortexpb.Sample{ + {TimestampMs: 1}, + }, + }, + { + Samples: []cortexpb.Sample{ + {TimestampMs: 2}, + }, + }, + }, + expected: 1, + }, + { + desc: "three SampleStreams, second is earliest", + sampleStreams: []queryrange.SampleStream{ + { + Samples: []cortexpb.Sample{ + {TimestampMs: 2}, + {TimestampMs: 3}, + }, + }, + { + Samples: []cortexpb.Sample{ + {TimestampMs: 1}, + }, + }, + { + Samples: []cortexpb.Sample{ + {TimestampMs: 2}, + }, + }, + }, + expected: 1, + }, + { + desc: "three SampleStreams, last is earliest", + sampleStreams: []queryrange.SampleStream{ + { + Samples: []cortexpb.Sample{ + {TimestampMs: 2}, + {TimestampMs: 3}, + }, + }, + { + Samples: []cortexpb.Sample{ + {TimestampMs: 2}, + }, + }, + { + Samples: []cortexpb.Sample{ + {TimestampMs: 1}, + }, + }, + }, + expected: 1, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + pr := queryrange.NewEmptyPrometheusResponse() + pr.Data.Result = tc.sampleStreams + res := minResponseTime(pr) + testutil.Equals(t, tc.expected, res) + }) + } +}