Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto_date_histogram aggregation #893

Merged
merged 15 commits into from
Oct 31, 2024
2 changes: 1 addition & 1 deletion quesma/model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
Metrics aggregation | Support | Bucket aggregation | Support | Pipeline aggregation | Support |
---------------------------|:----------------------:|------------------------------|:------------------:|------------------------|:------------------:|
Avg | :white_check_mark: | Adjacency matrix | :x: | Average bucket | :white_check_mark: |
Cardinality | :white_check_mark: | Auto-interval date histogram | :x: | Bucket script | :x: |
Cardinality | :white_check_mark: | Auto-interval date histogram | :wavy_dash: | Bucket script | :wavy_dash: |
Extended Stats | :white_check_mark:[^1] | Categorize text | :x: | Bucket count K-S test | :x: |
Avg | :white_check_mark: | Children | :x: | Bucket correlation | :x: |
Boxplot | :x: | Composite | :x: | Bucket selector | :x: |
Expand Down
56 changes: 56 additions & 0 deletions quesma/model/bucket_aggregations/auto_date_histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"time"
)

type AutoDateHistogram struct {
ctx context.Context
field model.Expr // name of the field, e.g. timestamp
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
bucketsNr int
key int64
}

func NewAutoDateHistogram(ctx context.Context, field model.Expr, bucketsNr int) *AutoDateHistogram {
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr}
}

func (query *AutoDateHistogram) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String())
return make(model.JsonMap, 0)
}
if len(rows) != 1 {
logger.WarnWithCtx(query.ctx).Msgf("unexpected (!= 1) number of rows returned for %s: %d.", query.String(), len(rows))
}
return model.JsonMap{
"buckets": []model.JsonMap{{
"key": query.key,
"key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"),
"doc_count": rows[0].LastColValue(),
}},
"interval": "100y", // seems working for bucketsNr=1 case. Will have to be changed for other cases.
}
}

func (query *AutoDateHistogram) String() string {
return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr)
}

func (query *AutoDateHistogram) GetField() model.Expr {
return query.field
}

func (query *AutoDateHistogram) SetKey(key int64) {
query.key = key
}
5 changes: 4 additions & 1 deletion quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ type Expr interface {
Accept(v ExprVisitor) interface{}
}

var InvalidExpr = Expr(nil)
var (
InvalidExpr = Expr(nil)
TrueExpr = NewLiteral(true)
)

// ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go
type ColumnRef struct {
Expand Down
18 changes: 13 additions & 5 deletions quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type BucketScript struct {
script string
}

func NewBucketScript(ctx context.Context, script string) BucketScript {
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, "_count")}
func NewBucketScript(ctx context.Context, path, script string) BucketScript {
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, path)}
}

func (query BucketScript) AggregationType() model.AggregationType {
Expand All @@ -28,8 +28,16 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow
const defaultValue = 0.
switch {
case query.script == "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0":
numerator := query.findFilterValue(rows, "numerator")
denominator := query.findFilterValue(rows, "denominator")
parent := query.GetPathToParent()
if len(parent) != 1 {
// TODO: research if this limitation can be removed, and do so if possible.
logger.WarnWithCtx(query.ctx).Msgf("unexpected parent path in bucket_script: %s. Returning default.", query.String())
return model.JsonMap{"value": defaultValue}
}

// replaceAll - hack but get the job done for the customer's case, and won't break anything in any other case.
numerator := query.findFilterValue(rows, strings.ReplaceAll(parent[0], "denominator", "numerator"))
denominator := query.findFilterValue(rows, strings.ReplaceAll(parent[0], "numerator", "denominator"))
if denominator == 0 {
return model.JsonMap{"value": defaultValue}
}
Expand Down Expand Up @@ -77,7 +85,7 @@ func (query BucketScript) findFilterValue(rows []model.QueryResultRow, filterNam
}
colName = strings.TrimSuffix(colName, "_col_0")
if strings.HasSuffix(colName, "-"+filterName) {
return float64(util.ExtractInt64(col.Value))
return util.ExtractNumeric64(col.Value)
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions quesma/model/where_visitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package model

// FindLowerBounds returns x if there is "x>=y" or "x>y" in the WHERE clause, but only as a single top-level expression.
// (I mean by that a>=0 is fine, a>=0 AND expr2 [AND ...]] is also fine (AND between all), but a>=0 OR e2 is not fine.
// a>=0 AND (expr2 OR expr3) is also fine, as on top level it's only an AND.
// We achieve that by only descending for AND operators.
//
// TODO: add upper bound here too, when bucket_nr=1 in auto_date_histogram (only use case of this function), it's not needed.
func FindLowerBounds(expr Expr) []InfixExpr {
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
if expr == nil {
return []InfixExpr{}
}

lowerBounds := make([]InfixExpr, 0)
visitor := NewBaseVisitor()
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
if e.Op == ">=" || e.Op == ">" {
lowerBounds = append(lowerBounds, e)
} else if e.Op == "AND" {
e.Left.Accept(visitor)
e.Right.Accept(visitor)
}
return nil
}

expr.Accept(visitor)
return lowerBounds
}
14 changes: 10 additions & 4 deletions quesma/queryparser/filters_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package queryparser

import (
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
)

Expand Down Expand Up @@ -32,13 +33,18 @@ func (cw *ClickhouseQueryTranslator) parseFilters(queryMap QueryMap) (success bo
}

filters := make([]bucket_aggregations.Filter, 0, len(nestedMap))
for name, filter := range nestedMap {
filterMap, ok := filter.(QueryMap)
for name, filterRaw := range nestedMap {
filterMap, ok := filterRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filter, filter)
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filterRaw, filterRaw)
continue
}
filters = append(filters, bucket_aggregations.NewFilter(name, cw.parseQueryMap(filterMap)))
filter := cw.parseQueryMap(filterMap)
if filter.WhereClause == nil {
filter.WhereClause = model.TrueExpr
filter.CanParse = true
}
filters = append(filters, bucket_aggregations.NewFilter(name, filter))
}
return true, bucket_aggregations.NewFilters(cw.Ctx, filters)
}
3 changes: 3 additions & 0 deletions quesma/queryparser/pancake_aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func (cw *ClickhouseQueryTranslator) pancakeParseAggregation(aggregationName str
if filterRaw, ok := queryMap["filter"]; ok {
if filter, ok := filterRaw.(QueryMap); ok {
whereClause := cw.parseQueryMap(filter).WhereClause
if whereClause == nil { // empty filter <=> true
whereClause = model.TrueExpr
}
aggregation.queryType = bucket_aggregations.NewFilterAgg(cw.Ctx, whereClause)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping", filterRaw, filterRaw)
Expand Down
27 changes: 18 additions & 9 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
delete(queryMap, "date_histogram")
return success, nil
}
if autoDateHistogram := cw.parseAutoDateHistogram(queryMap["auto_date_histogram"]); autoDateHistogram != nil {
aggregation.queryType = autoDateHistogram
delete(queryMap, "auto_date_histogram")
return
}
for _, termsType := range []string{"terms", "significant_terms"} {
termsRaw, ok := queryMap[termsType]
if !ok {
Expand All @@ -136,15 +141,8 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
aggregation.filterOutEmptyKeyBucket = true
}

size := 10
if sizeRaw, ok := terms["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
}

const defaultSize = 10
size := cw.parseSize(terms, defaultSize)
orderBy := cw.parseOrder(terms, queryMap, []model.Expr{fieldExpression})
aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", orderBy[0]) // TODO probably full, not [0]
aggregation.selectedColumns = append(aggregation.selectedColumns, fieldExpression)
Expand Down Expand Up @@ -346,6 +344,17 @@ func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bu
)
}

func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *bucket_aggregations.AutoDateHistogram {
params, ok := paramsRaw.(QueryMap)
if !ok {
return nil
}

field := cw.parseFieldField(params, "auto_date_histogram")
bucketsNr := cw.parseIntField(params, "buckets", 10)
return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr)
}

func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr {
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/pancake_sql_query_generation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestPancakeQueryGeneration(t *testing.T) {
}

if test.TestName == "multiple buckets_path(file:clients/clover,nr:1)" {
t.Skip("Unskip after merge of auto_date_histogram")
t.Skip("This needs fixing ASAP, easy to fix")
}

fmt.Println("i:", i, "test:", test.TestName)
Expand Down
47 changes: 47 additions & 0 deletions quesma/queryparser/pancake_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,52 @@ func (a *pancakeTransformer) createTopHitAndTopMetricsPancakes(pancake *pancakeM
return
}

trzysiek marked this conversation as resolved.
Show resolved Hide resolved
func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLayer, whereClause model.Expr) {
for _, layer := range layers {
if layer.nextBucketAggregation != nil {
if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok {
lowerBoundsInWhere := model.FindLowerBounds(whereClause)
trzysiek marked this conversation as resolved.
Show resolved Hide resolved
if len(lowerBoundsInWhere) == 0 {
logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound for auto_date_histogram %v", autoDateHistogram)
continue
}

var (
timestampLowerBound model.InfixExpr
found bool
)
for _, lowerBound := range lowerBoundsInWhere {
if lowerBound.Left == autoDateHistogram.GetField() {
timestampLowerBound = lowerBound
found = true
break
}
}
if !found {
logger.WarnWithCtx(a.ctx).Msgf("auto_date_histogram field %s does not match timestamp lower bound %s", autoDateHistogram.GetField(), timestampLowerBound.Left)
continue
}

var timestamp int64
if fun, ok := timestampLowerBound.Right.(model.FunctionExpr); ok && len(fun.Args) == 1 {
if expr, ok := fun.Args[0].(model.LiteralExpr); ok {
if ts, ok := expr.Value.(int64); ok {
timestamp = ts
} else {
logger.WarnWithCtx(a.ctx).Msgf("timestamp lower bound is not a number, but %T, value: %v", expr.Value, expr.Value)
}
} else {
logger.WarnWithCtx(a.ctx).Msgf("timestamp lower bound is not a literal, but %T, value: %v", fun.Args[0], fun.Args[0])
}
} else {
logger.WarnWithCtx(a.ctx).Msgf("timestamp lower bound is not a function, but %T, value: %v", timestampLowerBound.Right, timestampLowerBound.Right)
}
autoDateHistogram.SetKey(timestamp)
}
}
}
}

func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregationTree) (pancakeResults []*pancakeModel, err error) {
if len(topLevel.children) == 0 {
return nil, fmt.Errorf("no top level aggregations found")
Expand All @@ -398,6 +444,7 @@ func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregati
}

a.connectPipelineAggregations(layers)
a.transformAutoDateHistogram(layers, topLevel.whereClause)

newPancake := pancakeModel{
layers: layers,
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
return
}
if script, ok := scriptRaw.(string); ok {
return pipeline_aggregations.NewBucketScript(cw.Ctx, script), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, script), true
}

script, ok := scriptRaw.(QueryMap)
Expand All @@ -204,7 +204,7 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
}

// okay, we've checked everything, it's indeed a simple count
return pipeline_aggregations.NewBucketScript(cw.Ctx, ""), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, ""), true
}

func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPathStr string, success bool) {
Expand Down
Loading
Loading