diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 274276a02c8f..b409422c1c8f 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -167,7 +167,7 @@ func TestEngine_LogsRateUnwrap(t *testing.T) { } } -func TestEngine_LogsInstantQuery(t *testing.T) { +func TestEngine_InstantQuery(t *testing.T) { t.Parallel() for _, test := range []struct { qs string @@ -182,26 +182,6 @@ func TestEngine_LogsInstantQuery(t *testing.T) { expected interface{} }{ - { - `{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10, - [][]logproto.Stream{ - {newStream(testSize, identity, `{app="foo"}`)}, - }, - []SelectLogParams{ - {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="foo"}`}}, - }, - logqlmodel.Streams([]logproto.Stream{newStream(10, identity, `{app="foo"}`)}), - }, - { - `{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(30, 0), logproto.BACKWARD, 30, - [][]logproto.Stream{ - {newStream(testSize, identity, `{app="bar"}`)}, - }, - []SelectLogParams{ - {&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="bar"}|="foo"|~".+bar"`}}, - }, - logqlmodel.Streams([]logproto.Stream{newStream(30, identity, `{app="bar"}`)}), - }, { `rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10, [][]logproto.Series{ @@ -975,7 +955,6 @@ func TestEngine_LogsInstantQuery(t *testing.T) { } { test := test t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { - t.Parallel() eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits, log.NewNopLogger()) @@ -2755,6 +2734,11 @@ func (q *querierRecorder) SelectSamples(_ context.Context, p SelectSampleParams) } func paramsID(p interface{}) string { + switch params := p.(type) { + case SelectLogParams: + case SelectSampleParams: + return fmt.Sprintf("%d", params.Plan.Hash()) + } b, err := json.Marshal(p) if err != nil { panic(err) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index cdf05829c200..1216efedd79d 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -331,9 +331,12 @@ func (ev *DefaultEvaluator) NewStepEvaluator( nextEvFactory = SampleEvaluatorFunc(func(ctx context.Context, _ SampleEvaluatorFactory, _ syntax.SampleExpr, _ Params) (StepEvaluator, error) { it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ &logproto.SampleQueryRequest{ - Start: q.Start().Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset), - End: q.End().Add(-rangExpr.Left.Offset), - Selector: e.String(), // intentionally send the vector for reducing labels. + // extend startTs backwards by step + Start: q.Start().Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset), + // add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges + End: q.End().Add(-rangExpr.Left.Offset).Add(time.Nanosecond), + // intentionally send the vector for reducing labels. + Selector: e.String(), Shards: q.Shards(), Plan: &plan.QueryPlan{ AST: expr, @@ -351,9 +354,12 @@ func (ev *DefaultEvaluator) NewStepEvaluator( case *syntax.RangeAggregationExpr: it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ &logproto.SampleQueryRequest{ - Start: q.Start().Add(-e.Left.Interval).Add(-e.Left.Offset), - End: q.End().Add(-e.Left.Offset), - Selector: expr.String(), + // extend startTs backwards by step + Start: q.Start().Add(-e.Left.Interval).Add(-e.Left.Offset), + // add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges + End: q.End().Add(-e.Left.Offset).Add(time.Nanosecond), + // intentionally send the vector for reducing labels. + Selector: e.String(), Shards: q.Shards(), Plan: &plan.QueryPlan{ AST: expr, diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go index f3a9cd692171..1b4423a47658 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" ) +// TODO(chaudum): Replace with new v1.Interval struct type Bounded interface { Bounds() (model.Time, model.Time) } @@ -34,9 +35,13 @@ func newBounds(mint, maxt model.Time) bounds { return bounds{mint: mint, maxt: m func (b bounds) Bounds() (model.Time, model.Time) { return b.mint, b.maxt } -func Overlap(a, b Bounded) bool { - aFrom, aThrough := a.Bounds() - bFrom, bThrough := b.Bounds() +// Overlap checks whether the given chunk or index bounds +// overlap with the bounds of a query range. +// chunk/index bounds are defined as [from, through] +// query bounds are defined as [from, through) +func Overlap(chk, qry Bounded) bool { + chkFrom, chkThrough := chk.Bounds() + qryFrom, qryThrough := qry.Bounds() - return aFrom < bThrough && aThrough > bFrom + return chkFrom < qryThrough && chkThrough >= qryFrom } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go index c9222f3efc98..de6825f0fcfa 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go @@ -28,6 +28,12 @@ func TestOverlap(t *testing.T) { // ensure [start,end) inclusivity works as expected a: newBounds(1, 5), b: newBounds(5, 6), + overlap: true, + }, + { + // ensure [start,end) inclusivity works as expected + a: newBounds(5, 6), + b: newBounds(1, 5), overlap: false, }, } { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go index 0c6044843026..23bab83e4170 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go @@ -115,7 +115,7 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model queryBounds := newBounds(from, through) return i.iter.For(ctx, i.maxParallel, func(ctx context.Context, idx Index) error { - if Overlap(queryBounds, idx) { + if Overlap(idx, queryBounds) { if i.filterer != nil { // TODO(owen-d): Find a nicer way