From 5ad544b6cd0e79f629c804a617ddcdf9786b8981 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Mon, 29 May 2023 17:03:45 +0200 Subject: [PATCH] all: implement subquery expressions for instant queries Signed-off-by: Michael Hoffmann --- engine/bench_test.go | 9 +- engine/engine.go | 62 ++++++--- engine/engine_test.go | 104 ++++++++++++++- engine/user_defined_test.go | 6 +- execution/aggregate/hashaggregate.go | 9 +- execution/aggregate/khashaggregate.go | 3 +- execution/execution.go | 87 +++++++------ execution/scan/subquery.go | 142 +++++++++++++++++++++ execution/step_invariant/step_invariant.go | 3 +- logicalplan/distribute.go | 11 +- logicalplan/distribute_test.go | 5 +- logicalplan/merge_selects.go | 3 +- logicalplan/merge_selects_test.go | 3 +- logicalplan/passthrough.go | 3 +- logicalplan/passthrough_test.go | 5 +- logicalplan/plan.go | 71 ++++++++--- logicalplan/plan_test.go | 5 +- logicalplan/propagate_selectors.go | 3 +- logicalplan/sort_matchers.go | 3 +- logicalplan/trim_sorts.go | 3 +- logicalplan/trim_sorts_test.go | 3 +- logicalplan/user_defined.go | 1 - query/options.go | 51 ++++++-- 23 files changed, 478 insertions(+), 117 deletions(-) create mode 100644 execution/scan/subquery.go diff --git a/engine/bench_test.go b/engine/bench_test.go index c01a078a..9d4f8e04 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -471,6 +471,10 @@ func BenchmarkInstantQuery(b *testing.B) { name: "sort_desc", query: `sort_desc(http_requests_total)`, }, + { + name: "subquery sum_over_time", + query: `sum_over_time(count(http_requests_total)[1h:10s])`, + }, } for _, tc := range cases { @@ -497,7 +501,10 @@ func BenchmarkInstantQuery(b *testing.B) { } }) b.Run("new_engine", func(b *testing.B) { - ng := engine.New(engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}}) + ng := engine.New(engine.Opts{ + EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}, + EnableSubqueries: true, + }) b.ResetTimer() b.ReportAllocs() diff --git a/engine/engine.go b/engine/engine.go index fc5435b9..0108898d 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -33,6 +33,7 @@ import ( "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) type QueryType int @@ -47,6 +48,7 @@ const ( subsystem string = "engine" InstantQuery QueryType = 1 RangeQuery QueryType = 2 + stepsBatch = 10 ) type Opts struct { @@ -72,6 +74,10 @@ type Opts struct { // This will default to false. EnableXFunctions bool + // EnableSubqueries enables the engine to handle subqueries without falling back to prometheus. + // This will default to false. + EnableSubqueries bool + // FallbackEngine Engine v1.QueryEngine @@ -219,6 +225,10 @@ func New(opts Opts) *compatibilityEngine { metrics: metrics, extLookbackDelta: opts.ExtLookbackDelta, enableAnalysis: opts.EnableAnalysis, + enableSubqueries: opts.EnableSubqueries, + noStepSubqueryIntervalFn: func(d time.Duration) time.Duration { + return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000) + }, } } @@ -234,8 +244,10 @@ type compatibilityEngine struct { timeout time.Duration metrics *engineMetrics - extLookbackDelta time.Duration - enableAnalysis bool + extLookbackDelta time.Duration + enableAnalysis bool + enableSubqueries bool + noStepSubqueryIntervalFn func(time.Duration) time.Duration } func (e *compatibilityEngine) SetQueryLogger(l promql.QueryLogger) { @@ -260,15 +272,21 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que // the presentation layer and not when computing the results. resultSort := newResultSort(expr) - lplan := logicalplan.New(expr, &logicalplan.Opts{ - Start: ts, - End: ts, - Step: 1, - LookbackDelta: opts.LookbackDelta(), - }) - lplan = lplan.Optimize(e.logicalOptimizers) - - exec, err := execution.New(ctx, lplan.Expr(), q, ts, ts, 0, opts.LookbackDelta(), e.extLookbackDelta, e.enableAnalysis) + qOpts := &query.Options{ + Context: ctx, + Start: ts, + End: ts, + Step: 0, + StepsBatch: stepsBatch, + LookbackDelta: opts.LookbackDelta(), + ExtLookbackDelta: e.extLookbackDelta, + EnableAnalysis: e.enableAnalysis, + EnableSubqueries: e.enableSubqueries, + NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, + } + + lplan := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers) + exec, err := execution.New(lplan.Expr(), q, qOpts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -311,15 +329,21 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) } - lplan := logicalplan.New(expr, &logicalplan.Opts{ - Start: start, - End: end, - Step: step, - LookbackDelta: opts.LookbackDelta(), - }) - lplan = lplan.Optimize(e.logicalOptimizers) + qOpts := &query.Options{ + Context: ctx, + Start: start, + End: end, + Step: step, + StepsBatch: stepsBatch, + LookbackDelta: opts.LookbackDelta(), + ExtLookbackDelta: e.extLookbackDelta, + EnableAnalysis: e.enableAnalysis, + EnableSubqueries: false, // not yet implemented for range queries. + NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, + } - exec, err := execution.New(ctx, lplan.Expr(), q, start, end, step, opts.LookbackDelta(), e.extLookbackDelta, e.enableAnalysis) + lplan := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers) + exec, err := execution.New(lplan.Expr(), q, qOpts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) diff --git a/engine/engine_test.go b/engine/engine_test.go index 2d5552dc..28aa9da3 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -46,6 +46,7 @@ func TestMain(m *testing.M) { func TestPromqlAcceptance(t *testing.T) { engine := engine.New(engine.Opts{ + EnableSubqueries: true, EngineOpts: promql.EngineOpts{ EnableAtModifier: true, EnableNegativeOffset: true, @@ -2775,10 +2776,11 @@ func TestInstantQuery(t *testing.T) { // Negative offset and at modifier are enabled by default // since Prometheus v2.33.0, so we also enable them. opts := promql.EngineOpts{ - Timeout: 1 * time.Hour, - MaxSamples: 1e10, - EnableNegativeOffset: true, - EnableAtModifier: true, + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + EnableNegativeOffset: true, + EnableAtModifier: true, + NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() }, } cases := []struct { @@ -2788,6 +2790,99 @@ func TestInstantQuery(t *testing.T) { queryTime time.Time sortByLabels bool // if true, the series in the result between the old and new engine should be sorted before compared }{ + { + name: "sum_over_time with subquery", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(sum by (series) (http_requests_total)[5m:1m])", + sortByLabels: true, + }, + { + name: "sum_over_time with subquery with default step", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(sum by (series) (http_requests_total)[5m:])", + sortByLabels: true, + }, + { + name: "sum_over_time with subquery with resolution that doesnt divide step length", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(sum by (series) (http_requests_total)[5m:22s])", + sortByLabels: true, + }, + { + name: "sum_over_time with subquery with offset", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(sum by (series) (http_requests_total)[5m:1m] offset 1m)", + sortByLabels: true, + }, + { + name: "sum_over_time with subquery with inner offset", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(sum by (series) (http_requests_total offset 1m)[5m:1m])", + sortByLabels: true, + }, + { + name: "sum_over_time with subquery with inner @ modifier", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(sum by (series) (http_requests_total @ 10)[5m:1m])", + sortByLabels: true, + }, + { + name: "sum_over_time with nested subqueries with inner @ modifier", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="2"} 2+2x50 + http_requests_total{pod="nginx-4", series="3"} 5+2x50 + http_requests_total{pod="nginx-5", series="1"} 8+4x50 + http_requests_total{pod="nginx-6", series="2"} 2+3x50`, + queryTime: time.Unix(600, 0), + query: "sum_over_time(rate(sum by (series) (http_requests_total @ 10)[5m:1m] @0)[10m:1m])", + sortByLabels: true, + }, + { + name: "sum_over_time with subquery should drop name label", + load: `load 10s + http_requests_total{pod="nginx-1", series="1"} 1+1x40 + http_requests_total{pod="nginx-2", series="1"} 2+2x50`, + queryTime: time.Unix(0, 0), + query: `sum_over_time(http_requests_total{series="1"} offset 7s[1h:1m] @ 119.800)`, + sortByLabels: true, + }, { name: "duplicate label set", load: `load 5m @@ -3638,6 +3733,7 @@ func TestInstantQuery(t *testing.T) { EngineOpts: opts, DisableFallback: disableFallback, LogicalOptimizers: optimizers, + EnableSubqueries: true, }) ctx := context.Background() diff --git a/engine/user_defined_test.go b/engine/user_defined_test.go index b086eac9..34809304 100644 --- a/engine/user_defined_test.go +++ b/engine/user_defined_test.go @@ -59,7 +59,7 @@ load 30s type injectVectorSelector struct{} -func (i injectVectorSelector) Optimize(expr parser.Expr, _ *logicalplan.Opts) parser.Expr { +func (i injectVectorSelector) Optimize(expr parser.Expr, _ *query.Options) parser.Expr { logicalplan.TraverseBottomUp(nil, &expr, func(_, current *parser.Expr) bool { switch (*current).(type) { case *parser.VectorSelector: @@ -76,9 +76,9 @@ type logicalVectorSelector struct { *parser.VectorSelector } -func (c logicalVectorSelector) MakeExecutionOperator(stepsBatch int, vectors *model.VectorPool, selectors *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { +func (c logicalVectorSelector) MakeExecutionOperator(vectors *model.VectorPool, selectors *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { return &vectorSelectorOperator{ - stepsBatch: stepsBatch, + stepsBatch: opts.StepsBatch, vectors: vectors, mint: opts.Start.UnixMilli(), diff --git a/execution/aggregate/hashaggregate.go b/execution/aggregate/hashaggregate.go index 9d207dd7..9fa9826b 100644 --- a/execution/aggregate/hashaggregate.go +++ b/execution/aggregate/hashaggregate.go @@ -17,6 +17,7 @@ import ( "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/worker" ) @@ -48,7 +49,7 @@ func NewHashAggregate( aggregation parser.ItemType, by bool, labels []string, - stepsBatch int, + opts *query.Options, ) (model.VectorOperator, error) { newAccumulator, err := makeAccumulatorFunc(aggregation) if err != nil { @@ -61,15 +62,15 @@ func NewHashAggregate( a := &aggregate{ next: next, paramOp: paramOp, - params: make([]float64, stepsBatch), + params: make([]float64, opts.StepsBatch), vectorPool: points, by: by, aggregation: aggregation, labels: labels, - stepsBatch: stepsBatch, + stepsBatch: opts.StepsBatch, newAccumulator: newAccumulator, } - a.workers = worker.NewGroup(stepsBatch, a.workerTask) + a.workers = worker.NewGroup(opts.StepsBatch, a.workerTask) a.OperatorTelemetry = &model.TrackedTelemetry{} return a, nil diff --git a/execution/aggregate/khashaggregate.go b/execution/aggregate/khashaggregate.go index d1bada91..157ad5d1 100644 --- a/execution/aggregate/khashaggregate.go +++ b/execution/aggregate/khashaggregate.go @@ -48,7 +48,6 @@ func NewKHashAggregate( aggregation parser.ItemType, by bool, labels []string, - stepsBatch int, opts *query.Options, ) (model.VectorOperator, error) { var compare func(float64, float64) bool @@ -74,7 +73,7 @@ func NewKHashAggregate( labels: labels, paramOp: paramOp, compare: compare, - params: make([]float64, stepsBatch), + params: make([]float64, opts.StepsBatch), } a.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { diff --git a/execution/execution.go b/execution/execution.go index 7176737f..0e1b51bd 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -17,8 +17,6 @@ package execution import ( - "context" - "runtime" "sort" "time" @@ -45,27 +43,14 @@ import ( "github.com/thanos-io/promql-engine/query" ) -const stepsBatch = 10 - // New creates new physical query execution for a given query expression which represents logical plan. // TODO(bwplotka): Add definition (could be parameters for each execution operator) we can optimize - it would represent physical plan. -func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, mint, maxt time.Time, step, lookbackDelta, extLookbackDelta time.Duration, enableAnalysis bool) (model.VectorOperator, error) { - opts := &query.Options{ - Context: ctx, - Start: mint, - End: maxt, - Step: step, - LookbackDelta: lookbackDelta, - StepsBatch: stepsBatch, - ExtLookbackDelta: extLookbackDelta, - EnableAnalysis: enableAnalysis, - } +func New(expr parser.Expr, queryable storage.Queryable, opts *query.Options) (model.VectorOperator, error) { selectorPool := engstore.NewSelectorPool(queryable) hints := storage.SelectHints{ - Start: mint.UnixMilli(), - End: maxt.UnixMilli(), - // TODO(fpetkovski): Adjust the step for sub-queries once they are supported. - Step: step.Milliseconds(), + Start: opts.Start.UnixMilli(), + End: opts.End.UnixMilli(), + Step: opts.Step.Milliseconds(), } return newOperator(expr, selectorPool, opts, hints) @@ -74,7 +59,7 @@ func New(ctx context.Context, expr parser.Expr, queryable storage.Queryable, min func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { switch e := expr.(type) { case *parser.NumberLiteral: - return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, e.Val), nil + return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, e.Val), nil case *parser.VectorSelector: start, end := getTimeRangesForVectorSelector(e, opts, 0) @@ -94,8 +79,16 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O hints.Func = e.Func.Name hints.Grouping = nil hints.By = false + + // TODO(saswatamcode): Range vector result might need new operator + // before it can be non-nested. https://github.com/thanos-io/promql-engine/issues/39 for i := range e.Args { switch t := e.Args[i].(type) { + case *parser.SubqueryExpr: + if !opts.EnableSubqueries { + return nil, parse.ErrNotImplemented + } + return newSubqueryFunction(e, t, storage, opts, hints) case *parser.MatrixSelector: return newRangeVectorFunction(e, t, storage, opts, hints) } @@ -121,9 +114,9 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O } if e.Op == parser.TOPK || e.Op == parser.BOTTOMK { - next, err = aggregate.NewKHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch, opts) + next, err = aggregate.NewKHashAggregate(model.NewVectorPool(opts.StepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, opts) } else { - next, err = aggregate.NewHashAggregate(model.NewVectorPool(stepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, stepsBatch) + next, err = aggregate.NewHashAggregate(model.NewVectorPool(opts.StepsBatch), next, paramOp, e.Op, !e.Without, e.Grouping, opts) } if err != nil { @@ -151,7 +144,7 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case parser.ADD: return next, nil case parser.SUB: - return unary.NewUnaryNegation(next, stepsBatch) + return unary.NewUnaryNegation(next, opts.StepsBatch) default: // This shouldn't happen as Op was validated when parsing already // https://github.com/prometheus/prometheus/blob/v2.38.0/promql/parser/parse.go#L573. @@ -161,13 +154,13 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O case *parser.StepInvariantExpr: switch t := e.Expr.(type) { case *parser.NumberLiteral: - return scan.NewNumberLiteralSelector(model.NewVectorPool(stepsBatch), opts, t.Val), nil + return scan.NewNumberLiteralSelector(model.NewVectorPool(opts.StepsBatch), opts, t.Val), nil } next, err := newOperator(e.Expr, storage, opts.WithEndTime(opts.Start), hints) if err != nil { return nil, err } - return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(stepsBatch, 1), next, e.Expr, opts, stepsBatch) + return step_invariant.NewStepInvariantOperator(model.NewVectorPoolWithSize(opts.StepsBatch, 1), next, e.Expr, opts) case logicalplan.Deduplicate: // The Deduplicate operator will deduplicate samples using a last-sample-wins strategy. @@ -186,8 +179,8 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O } operators[i] = operator } - coalesce := exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...) - dedup := exchange.NewDedupOperator(model.NewVectorPool(stepsBatch), coalesce) + coalesce := exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, operators...) + dedup := exchange.NewDedupOperator(model.NewVectorPool(opts.StepsBatch), coalesce) return exchange.NewConcurrent(dedup, 2), nil case logicalplan.RemoteExecution: @@ -202,12 +195,12 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O // We need to set the lookback for the selector to 0 since the remote query already applies one lookback. selectorOpts := *opts selectorOpts.LookbackDelta = 0 - remoteExec := remote.NewExecution(qry, model.NewVectorPool(stepsBatch), e.QueryRangeStart, &selectorOpts) + remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, &selectorOpts) return exchange.NewConcurrent(remoteExec, 2), nil case logicalplan.Noop: return noop.NewOperator(), nil case logicalplan.UserDefinedExpr: - return e.MakeExecutionOperator(stepsBatch, model.NewVectorPool(stepsBatch), storage, opts, hints) + return e.MakeExecutionOperator(model.NewVectorPool(opts.StepsBatch), storage, opts, hints) default: return nil, errors.Wrapf(parse.ErrNotSupportedExpr, "got: %s", e) } @@ -250,14 +243,36 @@ func newRangeVectorFunction(e *parser.Call, t *parser.MatrixSelector, storage *e operators := make([]model.VectorOperator, 0, numShards) for i := 0; i < numShards; i++ { - operator, err := scan.NewMatrixSelector(model.NewVectorPool(stepsBatch), filter, e, opts, t.Range, vs.Offset, i, numShards) + operator, err := scan.NewMatrixSelector(model.NewVectorPool(opts.StepsBatch), filter, e, opts, t.Range, vs.Offset, i, numShards) if err != nil { return nil, err } operators = append(operators, exchange.NewConcurrent(operator, 2)) } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil + return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, operators...), nil +} + +func newSubqueryFunction(e *parser.Call, t *parser.SubqueryExpr, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { + // TODO: We dont implement ext functions + if parse.IsExtFunction(e.Func.Name) { + return nil, parse.ErrNotImplemented + } + // TODO: only instant queries for now. + if !opts.IsInstantQuery() { + return nil, parse.ErrNotImplemented + } + nOpts := query.NestedOptionsForSubquery(opts, t) + + hints.Start = nOpts.Start.UnixMilli() + hints.End = nOpts.End.UnixMilli() + hints.Step = nOpts.Step.Milliseconds() + + inner, err := newOperator(t.Expr, storage, nOpts, hints) + if err != nil { + return nil, err + } + return scan.NewSubqueryOperator(model.NewVectorPool(opts.StepsBatch), inner, opts, e, t) } func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { @@ -274,7 +289,7 @@ func newInstantVectorFunction(e *parser.Call, storage *engstore.SelectorPool, op nextOperators = append(nextOperators, next) } - return function.NewFunctionOperator(e, nextOperators, stepsBatch, opts) + return function.NewFunctionOperator(e, nextOperators, opts.StepsBatch, opts) } func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Options, offset time.Duration) (model.VectorOperator, error) { @@ -286,11 +301,11 @@ func newShardedVectorSelector(selector engstore.SeriesSelector, opts *query.Opti for i := 0; i < numShards; i++ { operator := exchange.NewConcurrent( scan.NewVectorSelector( - model.NewVectorPool(stepsBatch), selector, opts, offset, i, numShards), 2) + model.NewVectorPool(opts.StepsBatch), selector, opts, offset, i, numShards), 2) operators = append(operators, operator) } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), opts, operators...), nil + return exchange.NewCoalesce(model.NewVectorPool(opts.StepsBatch), opts, operators...), nil } func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { @@ -302,7 +317,7 @@ func newVectorBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.Select if err != nil { return nil, err } - return binary.NewVectorOperator(model.NewVectorPool(stepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) + return binary.NewVectorOperator(model.NewVectorPool(opts.StepsBatch), leftOperator, rightOperator, e.VectorMatching, e.Op, e.ReturnBool, opts) } func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) { @@ -323,7 +338,7 @@ func newScalarBinaryOperator(e *parser.BinaryExpr, selectorPool *engstore.Select scalarSide = binary.ScalarSideLeft } - return binary.NewScalar(model.NewVectorPoolWithSize(stepsBatch, 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts) + return binary.NewScalar(model.NewVectorPoolWithSize(opts.StepsBatch, 1), lhs, rhs, e.Op, scalarSide, e.ReturnBool, opts) } // Copy from https://github.com/prometheus/prometheus/blob/v2.39.1/promql/engine.go#L791. diff --git a/execution/scan/subquery.go b/execution/scan/subquery.go new file mode 100644 index 00000000..984591e3 --- /dev/null +++ b/execution/scan/subquery.go @@ -0,0 +1,142 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package scan + +import ( + "context" + "fmt" + "sync" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/promql-engine/execution/model" + "github.com/thanos-io/promql-engine/execution/parse" + "github.com/thanos-io/promql-engine/extlabels" + "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" +) + +// TODO: only instant subqueries for now. +type subqueryOperator struct { + next model.VectorOperator + pool *model.VectorPool + call functionCall + mint int64 + maxt int64 + currentStep int64 + + funcExpr *parser.Call + subQuery *parser.SubqueryExpr + + onceSeries sync.Once + series []labels.Labels + acc [][]sample +} + +func NewSubqueryOperator(pool *model.VectorPool, next model.VectorOperator, opts *query.Options, funcExpr *parser.Call, subQuery *parser.SubqueryExpr) (model.VectorOperator, error) { + call, ok := rangeVectorFuncs[funcExpr.Func.Name] + if !ok { + return nil, parse.UnknownFunctionError(funcExpr.Func) + } + return &subqueryOperator{ + next: next, + call: call, + pool: pool, + funcExpr: funcExpr, + subQuery: subQuery, + mint: opts.Start.UnixMilli(), + maxt: opts.End.UnixMilli(), + currentStep: opts.Start.UnixMilli(), + }, nil +} + +func (o *subqueryOperator) Explain() (me string, next []model.VectorOperator) { + return fmt.Sprintf("[*subqueryOperator] %v()", o.funcExpr.Func.Name), []model.VectorOperator{o.next} +} + +func (o *subqueryOperator) GetPool() *model.VectorPool { return o.pool } + +func (o *subqueryOperator) Next(ctx context.Context) ([]model.StepVector, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + if o.currentStep > o.maxt { + return nil, nil + } + if err := o.initSeries(ctx); err != nil { + return nil, err + } + +ACC: + for { + vectors, err := o.next.Next(ctx) + if err != nil { + return nil, err + } + if len(vectors) == 0 { + break ACC + } + for _, vector := range vectors { + for j, s := range vector.Samples { + o.acc[vector.SampleIDs[j]] = append(o.acc[vector.SampleIDs[j]], sample{T: vector.T, F: s}) + } + o.next.GetPool().PutStepVector(vector) + } + o.next.GetPool().PutVectors(vectors) + } + + res := o.pool.GetVectorBatch() + sv := o.pool.GetStepVector(o.currentStep) + for sampleId, rangeSamples := range o.acc { + f, h, ok := o.call(functionArgs{ + Samples: rangeSamples, + StepTime: o.currentStep, + SelectRange: o.subQuery.Range.Milliseconds(), + Offset: o.subQuery.Offset.Milliseconds(), + }) + if ok { + if h != nil { + sv.AppendHistogram(o.pool, uint64(sampleId), h) + } else { + sv.AppendSample(o.pool, uint64(sampleId), f) + } + } + } + res = append(res, sv) + + o.currentStep++ + return res, nil +} + +func (o *subqueryOperator) Series(ctx context.Context) ([]labels.Labels, error) { + if err := o.initSeries(ctx); err != nil { + return nil, err + } + return o.series, nil +} + +func (o *subqueryOperator) initSeries(ctx context.Context) error { + var err error + o.onceSeries.Do(func() { + var series []labels.Labels + series, err = o.next.Series(ctx) + if err != nil { + return + } + + o.series = make([]labels.Labels, len(series)) + o.acc = make([][]sample, len(series)) + var b labels.ScratchBuilder + for i, s := range series { + lbls := s + if o.funcExpr.Func.Name != "last_over_time" { + lbls, _ = extlabels.DropMetricName(s, b) + } + o.series[i] = lbls + } + }) + return err +} diff --git a/execution/step_invariant/step_invariant.go b/execution/step_invariant/step_invariant.go index 33a2566f..3db7708f 100644 --- a/execution/step_invariant/step_invariant.go +++ b/execution/step_invariant/step_invariant.go @@ -52,7 +52,6 @@ func NewStepInvariantOperator( next model.VectorOperator, expr parser.Expr, opts *query.Options, - stepsBatch int, ) (model.VectorOperator, error) { interval := opts.Step.Milliseconds() // We set interval to be at least 1. @@ -66,7 +65,7 @@ func NewStepInvariantOperator( mint: opts.Start.UnixMilli(), maxt: opts.End.UnixMilli(), step: interval, - stepsBatch: stepsBatch, + stepsBatch: opts.StepsBatch, cacheResult: true, } // We do not duplicate results for range selectors since result is a matrix diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index bc3a3b36..aefc4694 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) type timeRange struct { @@ -138,7 +139,7 @@ type DistributedExecutionOptimizer struct { Endpoints api.RemoteEndpoints } -func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *Opts) parser.Expr { +func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Options) parser.Expr { engines := m.Endpoints.Engines() sort.Slice(engines, func(i, j int) bool { return engines[i].MinT() < engines[j].MinT() @@ -227,7 +228,7 @@ func newRemoteAggregation(rootAggregation *parser.AggregateExpr, engines []api.R // distributeQuery takes a PromQL expression in the form of *parser.Expr and a set of remote engines. // For each engine which matches the time range of the query, it creates a RemoteExecution scoped to the range of the engine. // All remote executions are wrapped in a Deduplicate logical node to make sure that results from overlapping engines are deduplicated. -func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engines []api.RemoteEngine, opts *Opts, allowedStartOffset time.Duration) parser.Expr { +func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engines []api.RemoteEngine, opts *query.Options, allowedStartOffset time.Duration) parser.Expr { if isAbsent(*expr) { return m.distributeAbsent(*expr, engines, opts) } @@ -271,7 +272,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engine } } -func (m DistributedExecutionOptimizer) distributeAbsent(expr parser.Expr, engines []api.RemoteEngine, opts *Opts) parser.Expr { +func (m DistributedExecutionOptimizer) distributeAbsent(expr parser.Expr, engines []api.RemoteEngine, opts *query.Options) parser.Expr { queries := make(RemoteExecutions, 0, len(engines)) for i := range engines { queries = append(queries, RemoteExecution{ @@ -302,7 +303,7 @@ func isAbsent(expr parser.Expr) bool { return call.Func.Name == "absent" || call.Func.Name == "absent_over_time" } -func getStartTimeForEngine(e api.RemoteEngine, opts *Opts, offset time.Duration, globalMinT int64) (time.Time, bool) { +func getStartTimeForEngine(e api.RemoteEngine, opts *query.Options, offset time.Duration, globalMinT int64) (time.Time, bool) { if e.MinT() > opts.End.UnixMilli() { return time.Time{}, false } @@ -336,7 +337,7 @@ func getStartTimeForEngine(e api.RemoteEngine, opts *Opts, offset time.Duration, // engine min time and the query step size. // The purpose of this alignment is to make sure that the steps for the remote query // have the same timestamps as the ones for the central query. -func calculateStepAlignedStart(opts *Opts, engineMinTime time.Time) time.Time { +func calculateStepAlignedStart(opts *query.Options, engineMinTime time.Time) time.Time { originalSteps := numSteps(opts.Start, opts.End, opts.Step) remoteQuerySteps := numSteps(engineMinTime, opts.End, opts.Step) diff --git a/logicalplan/distribute_test.go b/logicalplan/distribute_test.go index 85014f13..83308598 100644 --- a/logicalplan/distribute_test.go +++ b/logicalplan/distribute_test.go @@ -14,6 +14,7 @@ import ( "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) func TestDistributedExecution(t *testing.T) { @@ -203,7 +204,7 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`, expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) + plan := New(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) optimizedPlan := plan.Optimize(optimizers) expectedPlan := cleanUp(replacements, tcase.expected) testutil.Equals(t, expectedPlan, optimizedPlan.Expr().String()) @@ -306,7 +307,7 @@ dedup( expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan := New(expr, &Opts{Start: queryStart, End: queryEnd, Step: queryStep}) + plan := New(expr, &query.Options{Start: queryStart, End: queryEnd, Step: queryStep}) optimizedPlan := plan.Optimize(optimizers) expectedPlan := cleanUp(replacements, tcase.expected) testutil.Equals(t, expectedPlan, optimizedPlan.Expr().String()) diff --git a/logicalplan/merge_selects.go b/logicalplan/merge_selects.go index d7f0cbf4..832e0839 100644 --- a/logicalplan/merge_selects.go +++ b/logicalplan/merge_selects.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) // MergeSelectsOptimizer optimizes a binary expression where @@ -20,7 +21,7 @@ import ( // and apply an additional filter for {c="d"}. type MergeSelectsOptimizer struct{} -func (m MergeSelectsOptimizer) Optimize(expr parser.Expr, _ *Opts) parser.Expr { +func (m MergeSelectsOptimizer) Optimize(expr parser.Expr, _ *query.Options) parser.Expr { heap := make(matcherHeap) extractSelectors(heap, expr) replaceMatchers(heap, &expr) diff --git a/logicalplan/merge_selects_test.go b/logicalplan/merge_selects_test.go index 1c0acaf0..99e42feb 100644 --- a/logicalplan/merge_selects_test.go +++ b/logicalplan/merge_selects_test.go @@ -9,6 +9,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) func TestMergeSelects(t *testing.T) { @@ -39,7 +40,7 @@ func TestMergeSelects(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan := New(expr, &Opts{}) + plan := New(expr, &query.Options{}) optimizedPlan := plan.Optimize(optimizers) testutil.Equals(t, tcase.expected, optimizedPlan.Expr().String()) }) diff --git a/logicalplan/passthrough.go b/logicalplan/passthrough.go index fcd2d55f..0b4bc324 100644 --- a/logicalplan/passthrough.go +++ b/logicalplan/passthrough.go @@ -6,6 +6,7 @@ package logicalplan import ( "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) // PassthroughOptimizer optimizes queries which can be simply passed @@ -14,7 +15,7 @@ type PassthroughOptimizer struct { Endpoints api.RemoteEndpoints } -func (m PassthroughOptimizer) Optimize(plan parser.Expr, opts *Opts) parser.Expr { +func (m PassthroughOptimizer) Optimize(plan parser.Expr, opts *query.Options) parser.Expr { engines := m.Endpoints.Engines() if len(engines) == 1 { return RemoteExecution{ diff --git a/logicalplan/passthrough_test.go b/logicalplan/passthrough_test.go index b98e1aeb..6495ea36 100644 --- a/logicalplan/passthrough_test.go +++ b/logicalplan/passthrough_test.go @@ -13,6 +13,7 @@ import ( "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) func TestPassthrough(t *testing.T) { @@ -25,7 +26,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) + plan := New(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) optimizedPlan := plan.Optimize(optimizers) testutil.Equals(t, "remote(time())", optimizedPlan.Expr().String()) @@ -38,7 +39,7 @@ func TestPassthrough(t *testing.T) { } optimizers := []Optimizer{PassthroughOptimizer{Endpoints: api.NewStaticEndpoints(engines)}} - plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) + plan := New(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) optimizedPlan := plan.Optimize(optimizers) testutil.Equals(t, "time()", optimizedPlan.Expr().String()) diff --git a/logicalplan/plan.go b/logicalplan/plan.go index ad324ed9..7ca18351 100644 --- a/logicalplan/plan.go +++ b/logicalplan/plan.go @@ -5,12 +5,15 @@ package logicalplan import ( "fmt" + "math" "time" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) var ( @@ -23,34 +26,24 @@ var DefaultOptimizers = []Optimizer{ MergeSelectsOptimizer{}, } -type Opts struct { - Start time.Time - End time.Time - Step time.Duration - LookbackDelta time.Duration -} - -func (o Opts) IsInstantQuery() bool { - return o.Start == o.End -} - type Plan interface { Optimize([]Optimizer) Plan Expr() parser.Expr } type Optimizer interface { - Optimize(expr parser.Expr, opts *Opts) parser.Expr + Optimize(expr parser.Expr, opts *query.Options) parser.Expr } type plan struct { expr parser.Expr - opts *Opts + opts *query.Options } -func New(expr parser.Expr, opts *Opts) Plan { +func New(expr parser.Expr, opts *query.Options) Plan { expr = preprocessExpr(expr, opts.Start, opts.End) setOffsetForAtModifier(opts.Start.UnixMilli(), expr) + setOffsetForInnerSubqueries(expr, opts) return &plan{ expr: expr, @@ -256,10 +249,13 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { if ts == nil { return originalOffset } - // TODO: support subquery. + subqOffset, _, subqTs := subqueryTimes(path) + if subqTs != nil { + subqOffset += time.Duration(evalTime-*subqTs) * time.Millisecond + } offsetForTs := time.Duration(evalTime-*ts) * time.Millisecond - offsetDiff := offsetForTs + offsetDiff := offsetForTs - subqOffset return originalOffset + offsetDiff } @@ -278,3 +274,46 @@ func setOffsetForAtModifier(evalTime int64, expr parser.Expr) { return nil }) } + +// https://github.com/prometheus/prometheus/blob/dfae954dc1137568f33564e8cffda321f2867925/promql/engine.go#L754 +// subqueryTimes returns the sum of offsets and ranges of all subqueries in the path. +// If the @ modifier is used, then the offset and range is w.r.t. that timestamp +// (i.e. the sum is reset when we have @ modifier). +// The returned *int64 is the closest timestamp that was seen. nil for no @ modifier. +func subqueryTimes(path []parser.Node) (time.Duration, time.Duration, *int64) { + var ( + subqOffset, subqRange time.Duration + ts int64 = math.MaxInt64 + ) + for _, node := range path { + if n, ok := node.(*parser.SubqueryExpr); ok { + subqOffset += n.OriginalOffset + subqRange += n.Range + if n.Timestamp != nil { + // The @ modifier on subquery invalidates all the offset and + // range till now. Hence resetting it here. + subqOffset = n.OriginalOffset + subqRange = n.Range + ts = *n.Timestamp + } + } + } + var tsp *int64 + if ts != math.MaxInt64 { + tsp = &ts + } + return subqOffset, subqRange, tsp +} + +func setOffsetForInnerSubqueries(expr parser.Expr, opts *query.Options) { + parser.Inspect(expr, func(node parser.Node, path []parser.Node) error { + switch n := node.(type) { + case *parser.SubqueryExpr: + nOpts := query.NestedOptionsForSubquery(opts, n) + setOffsetForAtModifier(nOpts.Start.UnixMilli(), n.Expr) + setOffsetForInnerSubqueries(n.Expr, nOpts) + return errors.New("stop iteration") + } + return nil + }) +} diff --git a/logicalplan/plan_test.go b/logicalplan/plan_test.go index 65b372ed..43b22795 100644 --- a/logicalplan/plan_test.go +++ b/logicalplan/plan_test.go @@ -12,6 +12,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) var spaces = regexp.MustCompile(`\s+`) @@ -86,7 +87,7 @@ func TestDefaultOptimizers(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) + plan := New(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) optimizedPlan := plan.Optimize(DefaultOptimizers) expectedPlan := strings.Trim(spaces.ReplaceAllString(tcase.expected, " "), " ") testutil.Equals(t, expectedPlan, optimizedPlan.Expr().String()) @@ -128,7 +129,7 @@ func TestMatcherPropagation(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan := New(expr, &Opts{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) + plan := New(expr, &query.Options{Start: time.Unix(0, 0), End: time.Unix(0, 0)}) optimizedPlan := plan.Optimize(optimizers) expectedPlan := strings.Trim(spaces.ReplaceAllString(tcase.expected, " "), " ") testutil.Equals(t, expectedPlan, optimizedPlan.Expr().String()) diff --git a/logicalplan/propagate_selectors.go b/logicalplan/propagate_selectors.go index 819794f5..45b5ccaf 100644 --- a/logicalplan/propagate_selectors.go +++ b/logicalplan/propagate_selectors.go @@ -9,13 +9,14 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) // PropagateMatchersOptimizer implements matcher propagation between // two vector selectors in a binary expression. type PropagateMatchersOptimizer struct{} -func (m PropagateMatchersOptimizer) Optimize(expr parser.Expr, _ *Opts) parser.Expr { +func (m PropagateMatchersOptimizer) Optimize(expr parser.Expr, _ *query.Options) parser.Expr { traverse(&expr, func(expr *parser.Expr) { binOp, ok := (*expr).(*parser.BinaryExpr) if !ok { diff --git a/logicalplan/sort_matchers.go b/logicalplan/sort_matchers.go index 946ad01e..2a156048 100644 --- a/logicalplan/sort_matchers.go +++ b/logicalplan/sort_matchers.go @@ -7,6 +7,7 @@ import ( "sort" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) // SortMatchers sorts all matchers in a selector so that @@ -14,7 +15,7 @@ import ( // can rely on this property. type SortMatchers struct{} -func (m SortMatchers) Optimize(expr parser.Expr, _ *Opts) parser.Expr { +func (m SortMatchers) Optimize(expr parser.Expr, _ *query.Options) parser.Expr { traverse(&expr, func(node *parser.Expr) { e, ok := (*node).(*parser.VectorSelector) if !ok { diff --git a/logicalplan/trim_sorts.go b/logicalplan/trim_sorts.go index 73646510..95cd72cb 100644 --- a/logicalplan/trim_sorts.go +++ b/logicalplan/trim_sorts.go @@ -5,6 +5,7 @@ package logicalplan import ( "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) // TrimSortFunctions trims sort functions. It can do that because for nested sort functions @@ -14,7 +15,7 @@ import ( type TrimSortFunctions struct { } -func (TrimSortFunctions) Optimize(expr parser.Expr, _ *Opts) parser.Expr { +func (TrimSortFunctions) Optimize(expr parser.Expr, _ *query.Options) parser.Expr { TraverseBottomUp(nil, &expr, func(parent, current *parser.Expr) bool { if current == nil || parent == nil { return true diff --git a/logicalplan/trim_sorts_test.go b/logicalplan/trim_sorts_test.go index b97f467f..910d7b2b 100644 --- a/logicalplan/trim_sorts_test.go +++ b/logicalplan/trim_sorts_test.go @@ -9,6 +9,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/thanos-io/promql-engine/parser" + "github.com/thanos-io/promql-engine/query" ) func TestTrimSorts(t *testing.T) { @@ -51,7 +52,7 @@ func TestTrimSorts(t *testing.T) { expr, err := parser.ParseExpr(tcase.expr) testutil.Ok(t, err) - plan := New(expr, &Opts{}) + plan := New(expr, &query.Options{}) optimizedPlan := plan.Optimize(optimizers) testutil.Equals(t, tcase.expected, optimizedPlan.Expr().String()) }) diff --git a/logicalplan/user_defined.go b/logicalplan/user_defined.go index 259b4ea6..80030956 100644 --- a/logicalplan/user_defined.go +++ b/logicalplan/user_defined.go @@ -13,7 +13,6 @@ import ( type UserDefinedExpr interface { parser.Expr MakeExecutionOperator( - stepsBatch int, vectors *model.VectorPool, selectors *engstore.SelectorPool, opts *query.Options, diff --git a/query/options.go b/query/options.go index fed4fa26..719716d8 100644 --- a/query/options.go +++ b/query/options.go @@ -6,18 +6,21 @@ package query import ( "context" "time" + + "github.com/thanos-io/promql-engine/parser" ) type Options struct { - Context context.Context - Start time.Time - End time.Time - Step time.Duration - LookbackDelta time.Duration - ExtLookbackDelta time.Duration - - StepsBatch int64 - EnableAnalysis bool + Context context.Context + Start time.Time + End time.Time + Step time.Duration + StepsBatch int + LookbackDelta time.Duration + ExtLookbackDelta time.Duration + NoStepSubqueryIntervalFn func(time.Duration) time.Duration + EnableSubqueries bool + EnableAnalysis bool } func (o *Options) NumSteps() int { @@ -27,14 +30,40 @@ func (o *Options) NumSteps() int { } totalSteps := (o.End.UnixMilli()-o.Start.UnixMilli())/o.Step.Milliseconds() + 1 - if o.StepsBatch < totalSteps { - return int(o.StepsBatch) + if int64(o.StepsBatch) < totalSteps { + return o.StepsBatch } return int(totalSteps) } +func (o *Options) IsInstantQuery() bool { + return o.NumSteps() == 1 +} + func (o *Options) WithEndTime(end time.Time) *Options { result := *o result.End = end return &result } + +func NestedOptionsForSubquery(opts *Options, t *parser.SubqueryExpr) *Options { + nOpts := &Options{ + Context: opts.Context, + End: opts.End.Add(-t.Offset), + LookbackDelta: opts.LookbackDelta, + StepsBatch: opts.StepsBatch, + ExtLookbackDelta: opts.ExtLookbackDelta, + NoStepSubqueryIntervalFn: opts.NoStepSubqueryIntervalFn, + EnableSubqueries: opts.EnableSubqueries, + } + if t.Step != 0 { + nOpts.Step = t.Step + } else { + nOpts.Step = opts.NoStepSubqueryIntervalFn(t.Range) + } + nOpts.Start = time.UnixMilli(nOpts.Step.Milliseconds() * (opts.Start.Add(-t.Offset-t.Range).UnixMilli() / nOpts.Step.Milliseconds())) + if nOpts.Start.Before(opts.Start.Add(-t.Offset - t.Range)) { + nOpts.Start = nOpts.Start.Add(nOpts.Step) + } + return nOpts +}