From 7c3252023d687340f5c99f826b48c70061d09b0f Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Tue, 15 Oct 2024 15:31:38 +0200 Subject: [PATCH] done, need merge of some other PRs --- .../auto_date_histogram.go | 54 +++++++ quesma/model/where_visitor.go | 22 +++ quesma/queryparser/aggregation_parser_test.go | 1 + .../pancake_aggregation_parser_buckets.go | 27 ++-- quesma/queryparser/pancake_transformer.go | 32 ++++ quesma/testdata/clients/clover.go | 138 ++++++++++++++++++ 6 files changed, 265 insertions(+), 9 deletions(-) create mode 100644 quesma/model/bucket_aggregations/auto_date_histogram.go create mode 100644 quesma/model/where_visitor.go create mode 100644 quesma/testdata/clients/clover.go diff --git a/quesma/model/bucket_aggregations/auto_date_histogram.go b/quesma/model/bucket_aggregations/auto_date_histogram.go new file mode 100644 index 000000000..5269c9adb --- /dev/null +++ b/quesma/model/bucket_aggregations/auto_date_histogram.go @@ -0,0 +1,54 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package bucket_aggregations + +import ( + "context" + "fmt" + "quesma/logger" + "quesma/model" + "time" +) + +type AutoDateHistogram struct { + ctx context.Context + field model.Expr // name of the field, e.g. timestamp + bucketsNr int + key int64 +} + +func NewAutoDateHistogram(ctx context.Context, field model.Expr, bucketsNr int) *AutoDateHistogram { + return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr} +} + +func (query *AutoDateHistogram) AggregationType() model.AggregationType { + return model.BucketAggregation +} + +func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap { + fmt.Println(rows) + if len(rows) == 0 { + logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String()) + return make(model.JsonMap, 0) + } + return model.JsonMap{ + "buckets": []model.JsonMap{{ + "key": query.key, + "key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"), + "doc_count": rows[0].LastColValue(), + }}, + "interval": "100y", + } +} + +func (query *AutoDateHistogram) String() string { + return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr) +} + +func (query *AutoDateHistogram) GetField() model.Expr { + return query.field +} + +func (query *AutoDateHistogram) SetKey(key int64) { + query.key = key +} diff --git a/quesma/model/where_visitor.go b/quesma/model/where_visitor.go new file mode 100644 index 000000000..40de24322 --- /dev/null +++ b/quesma/model/where_visitor.go @@ -0,0 +1,22 @@ +package model + +// TODO: it's not 100% full/proper implementation, but works in the client case +func FindTimestampLowerBound(expr Expr) (InfixExpr, bool) { + candidates := make([]InfixExpr, 0) + visitor := NewBaseVisitor() + visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} { + if e.Op == ">=" { + candidates = append(candidates, e) + } else if e.Op == "AND" { + e.Left.Accept(visitor) + e.Right.Accept(visitor) + } + return nil + } + + expr.Accept(visitor) + if len(candidates) == 1 { + return candidates[0], true + } + return InfixExpr{}, false +} diff --git a/quesma/queryparser/aggregation_parser_test.go b/quesma/queryparser/aggregation_parser_test.go index 2e40ed546..4d8d17ac9 100644 --- a/quesma/queryparser/aggregation_parser_test.go +++ b/quesma/queryparser/aggregation_parser_test.go @@ -666,6 +666,7 @@ func allAggregationTests() []testdata.AggregationTestCase { add(kibana_visualize.PipelineAggregationTests, "kibana-visualize/pipeline_agg_req") add(clients.KunkkaTests, "clients/kunkka") add(clients.OpheliaTests, "clients/ophelia") + add(clients.CloverTests, "clients/clover") return allTests } diff --git a/quesma/queryparser/pancake_aggregation_parser_buckets.go b/quesma/queryparser/pancake_aggregation_parser_buckets.go index 9830b499c..ef019d144 100644 --- a/quesma/queryparser/pancake_aggregation_parser_buckets.go +++ b/quesma/queryparser/pancake_aggregation_parser_buckets.go @@ -120,6 +120,11 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa delete(queryMap, "date_histogram") return success, nil } + if autoDateHistogram := cw.parseAutoDateHistogram(queryMap["auto_date_histogram"]); autoDateHistogram != nil { + aggregation.queryType = autoDateHistogram + delete(queryMap, "auto_date_histogram") + return + } for _, termsType := range []string{"terms", "significant_terms"} { termsRaw, ok := queryMap[termsType] if !ok { @@ -136,15 +141,8 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa aggregation.filterOutEmptyKeyBucket = true } - size := 10 - if sizeRaw, ok := terms["size"]; ok { - if sizeParsed, ok := sizeRaw.(float64); ok { - size = int(sizeParsed) - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw) - } - } - + const defaultSize = 10 + size := cw.parseSize(terms, defaultSize) orderBy := cw.parseOrder(terms, queryMap, []model.Expr{fieldExpression}) aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", orderBy[0]) // TODO probably full, not [0] aggregation.selectedColumns = append(aggregation.selectedColumns, fieldExpression) @@ -346,6 +344,17 @@ func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bu ) } +func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *bucket_aggregations.AutoDateHistogram { + params, ok := paramsRaw.(QueryMap) + if !ok { + return nil + } + + field := cw.parseFieldField(params, "auto_date_histogram") + bucketsNr := cw.parseIntField(params, "buckets", 10) + return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr) +} + func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr { defaultDirection := model.DescOrder defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection) diff --git a/quesma/queryparser/pancake_transformer.go b/quesma/queryparser/pancake_transformer.go index 63b7a62d8..8c200547c 100644 --- a/quesma/queryparser/pancake_transformer.go +++ b/quesma/queryparser/pancake_transformer.go @@ -12,6 +12,7 @@ import ( "reflect" "sort" "strings" + "time" ) // 2. Translate aggregation tree into pancake model. @@ -374,6 +375,36 @@ func (a *pancakeTransformer) createTopHitAndTopMetricsPancakes(pancake *pancakeM return } +func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLayer, whereClause model.Expr) { + for _, layer := range layers { + if layer.nextBucketAggregation != nil { + if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok { + timestampLowerBound, ok := model.FindTimestampLowerBound(whereClause) + if !ok { + logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound for auto_date_histogram %v", autoDateHistogram) + continue + } + if autoDateHistogram.GetField() != timestampLowerBound.Left { + logger.WarnWithCtx(a.ctx).Msgf("auto_date_histogram field %s does not match timestamp lower bound %s", autoDateHistogram.GetField(), timestampLowerBound.Left) + continue + } + var b any + if fun, ok := timestampLowerBound.Right.(model.FunctionExpr); ok && len(fun.Args) == 1 { + if s, ok := fun.Args[0].(model.LiteralExpr); ok { + b = s.Value + } + } + x, err := time.Parse("2006-01-02T15:04:05.000Z", b.(string)[1:len(b.(string))-1]) + if err != nil { + //fmt.Println(err) + continue + } + autoDateHistogram.SetKey(x.UnixMilli()) + } + } + } +} + func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregationTree) (pancakeResults []*pancakeModel, err error) { if len(topLevel.children) == 0 { return nil, fmt.Errorf("no top level aggregations found") @@ -398,6 +429,7 @@ func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregati } a.connectPipelineAggregations(layers) + a.transformAutoDateHistogram(layers, topLevel.whereClause) newPancake := pancakeModel{ layers: layers, diff --git a/quesma/testdata/clients/clover.go b/quesma/testdata/clients/clover.go new file mode 100644 index 000000000..834b52a07 --- /dev/null +++ b/quesma/testdata/clients/clover.go @@ -0,0 +1,138 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package clients + +import ( + "quesma/model" + "quesma/testdata" +) + +var CloverTests = []testdata.AggregationTestCase{ + { // [0] + TestName: "simplest auto_date_histogram", + QueryRequestJson: ` + { + "aggs": { + "timeseries": { + "aggs": { + "469ef7fe-5927-42d1-918b-37c738c600f0": { + "bucket_script": { + "buckets_path": { + "count": "_count" + }, + "gap_policy": "skip", + "script": { + "lang": "expression", + "source": "count * 1" + } + } + } + }, + "auto_date_histogram": { + "buckets": 1, + "field": "timestamp" + }, + "meta": { + "dataViewId": "d3d7af60-4c81-11e8-b3d7-01146121b73d", + "indexPatternString": "kibana_sample_data_flights", + "intervalString": "54000000ms", + "normalized": true, + "panelId": "1a1d745d-0c21-4103-a2ae-df41d4fbd366", + "seriesId": "866fb08f-b9a4-43eb-a400-38ebb6c13aed", + "timeField": "timestamp" + } + } + }, + "query": { + "bool": { + "filter": [], + "must": [ + { + "range": { + "timestamp": { + "format": "strict_date_optional_time", + "gte": "2024-10-10T17:33:47.125Z", + "lte": "2024-10-11T08:33:47.125Z" + } + } + } + ], + "must_not": [], + "should": [] + } + }, + "runtime_mappings": { + "hour_of_day": { + "script": { + "source": "emit(doc['timestamp'].value.getHour());" + }, + "type": "long" + } + }, + "size": 0, + "timeout": "30000ms", + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "completion_time_in_millis": 1728635627258, + "expiration_time_in_millis": 1728635687254, + "id": "FlhaTzBhMkpQU3lLMmlzNHhBeU9FMHcbaUp3ZGNYdDNSaGF3STVFZ2xWY3RuQTo2MzU4", + "is_partial": false, + "is_running": false, + "response": { + "_shards": { + "failed": 0, + "skipped": 0, + "successful": 1, + "total": 1 + }, + "aggregations": { + "timeseries": { + "buckets": [ + { + "469ef7fe-5927-42d1-918b-37c738c600f0": { + "value": 202.0 + }, + "doc_count": 202, + "key": 1728518400000, + "key_as_string": "2024-10-10T00:00:00.000Z" + } + ], + "interval": "7d", + "meta": { + "dataViewId": "d3d7af60-4c81-11e8-b3d7-01146121b73d", + "indexPatternString": "kibana_sample_data_flights", + "intervalString": "54000000ms", + "normalized": true, + "panelId": "1a1d745d-0c21-4103-a2ae-df41d4fbd366", + "seriesId": "866fb08f-b9a4-43eb-a400-38ebb6c13aed", + "timeField": "timestamp" + } + } + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "eq", + "value": 202 + } + }, + "timed_out": false, + "took": 4 + }, + "start_time_in_millis": 1728635627254 + }`, + ExpectedPancakeResults: []model.QueryResultRow{ + {Cols: []model.QueryResultCol{ + model.NewQueryResultCol("aggr__timeseries__count", int64(202)), + }}, + }, + ExpectedPancakeSQL: ` + SELECT count(*) AS "aggr__timeseries__count" + FROM __quesma_table_name + WHERE ("timestamp">=parseDateTime64BestEffort('2024-10-10T17:33:47.125Z') AND + "timestamp"<=parseDateTime64BestEffort('2024-10-11T08:33:47.125Z'))`, + }, +}