Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto_date_histogram aggregation #893

Merged
merged 15 commits into from
Oct 31, 2024
2 changes: 1 addition & 1 deletion quesma/model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
Metrics aggregation | Support | Bucket aggregation | Support | Pipeline aggregation | Support |
---------------------------|:----------------------:|------------------------------|:------------------:|------------------------|:------------------:|
Avg | :white_check_mark: | Adjacency matrix | :x: | Average bucket | :white_check_mark: |
Cardinality | :white_check_mark: | Auto-interval date histogram | :x: | Bucket script | :x: |
Cardinality | :white_check_mark: | Auto-interval date histogram | :wavy_dash: | Bucket script | :wavy_dash: |
Extended Stats | :white_check_mark:[^1] | Categorize text | :x: | Bucket count K-S test | :x: |
Avg | :white_check_mark: | Children | :x: | Bucket correlation | :x: |
Boxplot | :x: | Composite | :x: | Bucket selector | :x: |
Expand Down
59 changes: 59 additions & 0 deletions quesma/model/bucket_aggregations/auto_date_histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"time"
)

// TODO: only bucketsNr=1 is supported for now. Implement other cases.
type AutoDateHistogram struct {
ctx context.Context
field model.ColumnRef // name of the field, e.g. timestamp
bucketsNr int
key int64
}

// NewAutoDateHistogram creates a new AutoDateHistogram aggregation, during parsing.
// Key is set later, during pancake transformation.
func NewAutoDateHistogram(ctx context.Context, field model.ColumnRef, bucketsNr int) *AutoDateHistogram {
return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr}
}

func (query *AutoDateHistogram) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String())
return make(model.JsonMap, 0)
}
if len(rows) != 1 {
logger.WarnWithCtx(query.ctx).Msgf("unexpected (!= 1) number of rows returned for %s: %d.", query.String(), len(rows))
}
return model.JsonMap{
"buckets": []model.JsonMap{{
"key": query.key,
"key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"),
"doc_count": rows[0].LastColValue(),
}},
"interval": "100y", // seems working for bucketsNr=1 case. Will have to be changed for other cases.
}
}

func (query *AutoDateHistogram) String() string {
return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr)
}

func (query *AutoDateHistogram) GetField() model.ColumnRef {
return query.field
}

func (query *AutoDateHistogram) SetKey(key int64) {
query.key = key
}
5 changes: 4 additions & 1 deletion quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ type Expr interface {
Accept(v ExprVisitor) interface{}
}

var InvalidExpr = Expr(nil)
var (
InvalidExpr = Expr(nil)
TrueExpr = NewLiteral(true)
)

// ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go
type ColumnRef struct {
Expand Down
18 changes: 13 additions & 5 deletions quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type BucketScript struct {
script string
}

func NewBucketScript(ctx context.Context, script string) BucketScript {
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, "_count")}
func NewBucketScript(ctx context.Context, path, script string) BucketScript {
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, path)}
}

func (query BucketScript) AggregationType() model.AggregationType {
Expand All @@ -28,8 +28,16 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow
const defaultValue = 0.
switch {
case query.script == "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0":
numerator := query.findFilterValue(rows, "numerator")
denominator := query.findFilterValue(rows, "denominator")
parent := query.GetPathToParent()
if len(parent) != 1 {
// TODO: research if this limitation can be removed, and do so if possible.
logger.WarnWithCtx(query.ctx).Msgf("unexpected parent path in bucket_script: %s. Returning default.", query.String())
return model.JsonMap{"value": defaultValue}
}

// replaceAll - hack but get the job done for the customer's case, and won't break anything in any other case.
numerator := query.findFilterValue(rows, strings.ReplaceAll(parent[0], "denominator", "numerator"))
denominator := query.findFilterValue(rows, strings.ReplaceAll(parent[0], "numerator", "denominator"))
if denominator == 0 {
return model.JsonMap{"value": defaultValue}
}
Expand Down Expand Up @@ -77,7 +85,7 @@ func (query BucketScript) findFilterValue(rows []model.QueryResultRow, filterNam
}
colName = strings.TrimSuffix(colName, "_col_0")
if strings.HasSuffix(colName, "-"+filterName) {
return float64(util.ExtractInt64(col.Value))
return util.ExtractNumeric64(col.Value)
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions quesma/model/where_visitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package model

import (
"math"
)

// FindLowerBounds returns y if there is "x>=y" or "x>y" in the WHERE clause, but only as a single top-level expression.
// (I mean by that a>=0 is fine, a>=0 AND expr2 [AND ...]] is also fine (AND between all), but a>=0 OR e2 is not fine.
// a>=0 AND (expr2 OR expr3) is also fine, as on top level it's only an AND.
// We achieve that by only descending for AND operators.
// If there are multiple such expressions, we return the smallest one.
//
// TODO: add upper bound here too, when bucket_nr=1 in auto_date_histogram (only use case of this function), it's not needed.
func FindTimestampLowerBound(field ColumnRef, whereClause Expr) (timestamp int64, found bool) {
timestamp = math.MaxInt64
visitor := NewBaseVisitor()
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
if columnRef, ok := e.Left.(ColumnRef); ok && columnRef == field && e.Op == ">=" || e.Op == ">" {
if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp64Milli" && len(fun.Args) == 1 {
if rhs, ok := fun.Args[0].(LiteralExpr); ok {
if rhsInt64, ok := rhs.Value.(int64); ok {
timestamp = min(timestamp, rhsInt64)
found = true
}
}
}
} else if e.Op == "AND" {
e.Left.Accept(visitor)
e.Right.Accept(visitor)
}
return nil
}

whereClause.Accept(visitor)
return
}
14 changes: 10 additions & 4 deletions quesma/queryparser/filters_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package queryparser

import (
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
)

Expand Down Expand Up @@ -32,13 +33,18 @@ func (cw *ClickhouseQueryTranslator) parseFilters(queryMap QueryMap) (success bo
}

filters := make([]bucket_aggregations.Filter, 0, len(nestedMap))
for name, filter := range nestedMap {
filterMap, ok := filter.(QueryMap)
for name, filterRaw := range nestedMap {
filterMap, ok := filterRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filter, filter)
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filterRaw, filterRaw)
continue
}
filters = append(filters, bucket_aggregations.NewFilter(name, cw.parseQueryMap(filterMap)))
filter := cw.parseQueryMap(filterMap)
if filter.WhereClause == nil {
filter.WhereClause = model.TrueExpr
filter.CanParse = true
}
filters = append(filters, bucket_aggregations.NewFilter(name, filter))
}
return true, bucket_aggregations.NewFilters(cw.Ctx, filters)
}
3 changes: 3 additions & 0 deletions quesma/queryparser/pancake_aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func (cw *ClickhouseQueryTranslator) pancakeParseAggregation(aggregationName str
if filterRaw, ok := queryMap["filter"]; ok {
if filter, ok := filterRaw.(QueryMap); ok {
whereClause := cw.parseQueryMap(filter).WhereClause
if whereClause == nil { // empty filter <=> true
whereClause = model.TrueExpr
}
aggregation.queryType = bucket_aggregations.NewFilterAgg(cw.Ctx, whereClause)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping", filterRaw, filterRaw)
Expand Down
32 changes: 23 additions & 9 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
delete(queryMap, "date_histogram")
return success, nil
}
if autoDateHistogram := cw.parseAutoDateHistogram(queryMap["auto_date_histogram"]); autoDateHistogram != nil {
aggregation.queryType = autoDateHistogram
delete(queryMap, "auto_date_histogram")
return
}
for _, termsType := range []string{"terms", "significant_terms"} {
termsRaw, ok := queryMap[termsType]
if !ok {
Expand All @@ -136,15 +141,8 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
aggregation.filterOutEmptyKeyBucket = true
}

size := 10
if sizeRaw, ok := terms["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
}

const defaultSize = 10
size := cw.parseSize(terms, defaultSize)
orderBy := cw.parseOrder(terms, queryMap, []model.Expr{fieldExpression})
aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", orderBy[0]) // TODO probably full, not [0]
aggregation.selectedColumns = append(aggregation.selectedColumns, fieldExpression)
Expand Down Expand Up @@ -346,6 +344,22 @@ func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bu
)
}

func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *bucket_aggregations.AutoDateHistogram {
params, ok := paramsRaw.(QueryMap)
if !ok {
return nil
}

fieldRaw := cw.parseFieldField(params, "auto_date_histogram")
var field model.ColumnRef
if field, ok = fieldRaw.(model.ColumnRef); !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v. Skipping auto_date_histogram", fieldRaw, fieldRaw)
return nil
}
bucketsNr := cw.parseIntField(params, "buckets", 10)
return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr)
}

func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr {
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/pancake_sql_query_generation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestPancakeQueryGeneration(t *testing.T) {
}

if test.TestName == "multiple buckets_path(file:clients/clover,nr:1)" {
t.Skip("Unskip after merge of auto_date_histogram")
t.Skip("This needs fixing ASAP, easy to fix")
}

fmt.Println("i:", i, "test:", test.TestName)
Expand Down
18 changes: 18 additions & 0 deletions quesma/queryparser/pancake_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,23 @@ func (a *pancakeTransformer) createTopHitAndTopMetricsPancakes(pancake *pancakeM
return
}

trzysiek marked this conversation as resolved.
Show resolved Hide resolved
// Auto date histogram is a date histogram, that automatically creates buckets based on time range.
// To do that we need parse WHERE clause which happens in this method.
func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLayer, whereClause model.Expr) {
for _, layer := range layers {
if layer.nextBucketAggregation != nil {
if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok {
if tsLowerBound, found := model.FindTimestampLowerBound(autoDateHistogram.GetField(), whereClause); found {
autoDateHistogram.SetKey(tsLowerBound)
} else {
logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound (field: %v, where clause: %v)",
autoDateHistogram.GetField(), whereClause)
}
}
}
}
}

func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregationTree) (pancakeResults []*pancakeModel, err error) {
if len(topLevel.children) == 0 {
return nil, fmt.Errorf("no top level aggregations found")
Expand All @@ -398,6 +415,7 @@ func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregati
}

a.connectPipelineAggregations(layers)
a.transformAutoDateHistogram(layers, topLevel.whereClause)

newPancake := pancakeModel{
layers: layers,
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
return
}
if script, ok := scriptRaw.(string); ok {
return pipeline_aggregations.NewBucketScript(cw.Ctx, script), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, script), true
}

script, ok := scriptRaw.(QueryMap)
Expand All @@ -204,7 +204,7 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
}

// okay, we've checked everything, it's indeed a simple count
return pipeline_aggregations.NewBucketScript(cw.Ctx, ""), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, ""), true
}

func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPathStr string, success bool) {
Expand Down
Loading
Loading