Skip to content

Commit

Permalink
fix(query engine): Include lines with ts equal to end timestamp of th…
Browse files Browse the repository at this point in the history
…e query range when executing range aggregations (#13448)

**Background**

When performing range vector aggregations, such as `count_over_time({env="dev"}[1h])`, the query range is divided into multiple steps at which the aggregation operation (e.g. counting the log lines) is evaluated.
Each step starts at `current step - step interval` and ends at `current step`, as depicted in the following chart. The select range for the logs is extended by the `step interval` into the past, in order to select logs for calculating the first step.

![screenshot_20240711_092352](https://github.com/grafana/loki/assets/281260/9ca6eaf5-148e-4743-aefa-6ff7071d64ad)

However, the select range for logs is `start` inclusive and `end` exclusive (written as `[start, end)`), but the evaluation of the steps for the range aggregation is `start` exclusive and `end` inclusive (written as `(start, end]`).

This leads to the problem that the very first timestamp at the beginning of the select range and the very last timestamp at the end of the select range are not included in the range aggregation. The "missing" last timestamp is not a problem, because a) in an instant query it is not supposed to be included anyway because of the `[start, end)` inclusivity of the query range and b) in a range query the last point of the previous step will be part of the next step evaluation.

**Issue**

The missing first timestamp, however, gets problematic when executing an instant query and the log timestamps are exactly at the start of the query range. This can happen when the query is split in the query frontend into multiple smaller time ranges, e.g. `1h`, `30m`, ...
Since the sub queries are executed independently on the queriers, all logs that have a timestamp exactly a multiple of the split interval, e.g. 00:00, 01:00, 02:00, ... for a 1h interval, are dismissed and therefore missing in the query result over the full time range of the original query.

**Fix**

In order to avoid the missing logs that have a timestamp a multiple of the split interval in instant queries, we need to adjust the query range for logs to also include the `end` timestamp (written as `[start, end]`). This is done by adding a "leap nanosecond" to the `end` timestamp of the log select range. This ensures that the included `end` timestamp of the step evaluation is also included in the log selection.

---

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jul 11, 2024
1 parent 38cabf1 commit e0ca67d
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 33 deletions.
28 changes: 6 additions & 22 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestEngine_LogsRateUnwrap(t *testing.T) {
}
}

func TestEngine_LogsInstantQuery(t *testing.T) {
func TestEngine_InstantQuery(t *testing.T) {
t.Parallel()
for _, test := range []struct {
qs string
Expand All @@ -182,26 +182,6 @@ func TestEngine_LogsInstantQuery(t *testing.T) {

expected interface{}
}{
{
`{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10,
[][]logproto.Stream{
{newStream(testSize, identity, `{app="foo"}`)},
},
[]SelectLogParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="foo"}`}},
},
logqlmodel.Streams([]logproto.Stream{newStream(10, identity, `{app="foo"}`)}),
},
{
`{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(30, 0), logproto.BACKWARD, 30,
[][]logproto.Stream{
{newStream(testSize, identity, `{app="bar"}`)},
},
[]SelectLogParams{
{&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="bar"}|="foo"|~".+bar"`}},
},
logqlmodel.Streams([]logproto.Stream{newStream(30, identity, `{app="bar"}`)}),
},
{
`rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
Expand Down Expand Up @@ -975,7 +955,6 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
} {
test := test
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()

eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits, log.NewNopLogger())

Expand Down Expand Up @@ -2755,6 +2734,11 @@ func (q *querierRecorder) SelectSamples(_ context.Context, p SelectSampleParams)
}

func paramsID(p interface{}) string {
switch params := p.(type) {
case SelectLogParams:
case SelectSampleParams:
return fmt.Sprintf("%d", params.Plan.Hash())
}
b, err := json.Marshal(p)
if err != nil {
panic(err)
Expand Down
18 changes: 12 additions & 6 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,12 @@ func (ev *DefaultEvaluator) NewStepEvaluator(
nextEvFactory = SampleEvaluatorFunc(func(ctx context.Context, _ SampleEvaluatorFactory, _ syntax.SampleExpr, _ Params) (StepEvaluator, error) {
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset),
End: q.End().Add(-rangExpr.Left.Offset),
Selector: e.String(), // intentionally send the vector for reducing labels.
// extend startTs backwards by step
Start: q.Start().Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset),
// add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges
End: q.End().Add(-rangExpr.Left.Offset).Add(time.Nanosecond),
// intentionally send the vector for reducing labels.
Selector: e.String(),
Shards: q.Shards(),
Plan: &plan.QueryPlan{
AST: expr,
Expand All @@ -351,9 +354,12 @@ func (ev *DefaultEvaluator) NewStepEvaluator(
case *syntax.RangeAggregationExpr:
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-e.Left.Interval).Add(-e.Left.Offset),
End: q.End().Add(-e.Left.Offset),
Selector: expr.String(),
// extend startTs backwards by step
Start: q.Start().Add(-e.Left.Interval).Add(-e.Left.Offset),
// add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges
End: q.End().Add(-e.Left.Offset).Add(time.Nanosecond),
// intentionally send the vector for reducing labels.
Selector: e.String(),
Shards: q.Shards(),
Plan: &plan.QueryPlan{
AST: expr,
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/prometheus/common/model"
)

// TODO(chaudum): Replace with new v1.Interval struct
type Bounded interface {
Bounds() (model.Time, model.Time)
}
Expand Down Expand Up @@ -34,9 +35,13 @@ func newBounds(mint, maxt model.Time) bounds { return bounds{mint: mint, maxt: m

func (b bounds) Bounds() (model.Time, model.Time) { return b.mint, b.maxt }

func Overlap(a, b Bounded) bool {
aFrom, aThrough := a.Bounds()
bFrom, bThrough := b.Bounds()
// Overlap checks whether the given chunk or index bounds
// overlap with the bounds of a query range.
// chunk/index bounds are defined as [from, through]
// query bounds are defined as [from, through)
func Overlap(chk, qry Bounded) bool {
chkFrom, chkThrough := chk.Bounds()
qryFrom, qryThrough := qry.Bounds()

return aFrom < bThrough && aThrough > bFrom
return chkFrom < qryThrough && chkThrough >= qryFrom
}
6 changes: 6 additions & 0 deletions pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ func TestOverlap(t *testing.T) {
// ensure [start,end) inclusivity works as expected
a: newBounds(1, 5),
b: newBounds(5, 6),
overlap: true,
},
{
// ensure [start,end) inclusivity works as expected
a: newBounds(5, 6),
b: newBounds(1, 5),
overlap: false,
},
} {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model
queryBounds := newBounds(from, through)

return i.iter.For(ctx, i.maxParallel, func(ctx context.Context, idx Index) error {
if Overlap(queryBounds, idx) {
if Overlap(idx, queryBounds) {

if i.filterer != nil {
// TODO(owen-d): Find a nicer way
Expand Down

0 comments on commit e0ca67d

Please sign in to comment.