Skip to content

Commit

Permalink
done, need merge of some other PRs
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Oct 15, 2024
1 parent d936dda commit 7c32520
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 9 deletions.
54 changes: 54 additions & 0 deletions quesma/model/bucket_aggregations/auto_date_histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"time"
)

type AutoDateHistogram struct {
ctx context.Context
field model.Expr // name of the field, e.g. timestamp
bucketsNr int
key int64
}

func NewAutoDateHistogram(ctx context.Context, field model.Expr, bucketsNr int) *AutoDateHistogram {
return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr}
}

func (query *AutoDateHistogram) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
fmt.Println(rows)
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String())
return make(model.JsonMap, 0)
}
return model.JsonMap{
"buckets": []model.JsonMap{{
"key": query.key,
"key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"),
"doc_count": rows[0].LastColValue(),
}},
"interval": "100y",
}
}

func (query *AutoDateHistogram) String() string {
return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr)
}

func (query *AutoDateHistogram) GetField() model.Expr {
return query.field
}

func (query *AutoDateHistogram) SetKey(key int64) {
query.key = key
}
22 changes: 22 additions & 0 deletions quesma/model/where_visitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package model

// TODO: it's not 100% full/proper implementation, but works in the client case
func FindTimestampLowerBound(expr Expr) (InfixExpr, bool) {
candidates := make([]InfixExpr, 0)
visitor := NewBaseVisitor()
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
if e.Op == ">=" {
candidates = append(candidates, e)
} else if e.Op == "AND" {
e.Left.Accept(visitor)
e.Right.Accept(visitor)
}
return nil
}

expr.Accept(visitor)
if len(candidates) == 1 {
return candidates[0], true
}
return InfixExpr{}, false
}
1 change: 1 addition & 0 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ func allAggregationTests() []testdata.AggregationTestCase {
add(kibana_visualize.PipelineAggregationTests, "kibana-visualize/pipeline_agg_req")
add(clients.KunkkaTests, "clients/kunkka")
add(clients.OpheliaTests, "clients/ophelia")
add(clients.CloverTests, "clients/clover")

return allTests
}
Expand Down
27 changes: 18 additions & 9 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
delete(queryMap, "date_histogram")
return success, nil
}
if autoDateHistogram := cw.parseAutoDateHistogram(queryMap["auto_date_histogram"]); autoDateHistogram != nil {
aggregation.queryType = autoDateHistogram
delete(queryMap, "auto_date_histogram")
return
}
for _, termsType := range []string{"terms", "significant_terms"} {
termsRaw, ok := queryMap[termsType]
if !ok {
Expand All @@ -136,15 +141,8 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
aggregation.filterOutEmptyKeyBucket = true
}

size := 10
if sizeRaw, ok := terms["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
}

const defaultSize = 10
size := cw.parseSize(terms, defaultSize)
orderBy := cw.parseOrder(terms, queryMap, []model.Expr{fieldExpression})
aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", orderBy[0]) // TODO probably full, not [0]
aggregation.selectedColumns = append(aggregation.selectedColumns, fieldExpression)
Expand Down Expand Up @@ -346,6 +344,17 @@ func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bu
)
}

func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *bucket_aggregations.AutoDateHistogram {
params, ok := paramsRaw.(QueryMap)
if !ok {
return nil
}

field := cw.parseFieldField(params, "auto_date_histogram")
bucketsNr := cw.parseIntField(params, "buckets", 10)
return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr)
}

func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr {
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)
Expand Down
32 changes: 32 additions & 0 deletions quesma/queryparser/pancake_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"reflect"
"sort"
"strings"
"time"
)

// 2. Translate aggregation tree into pancake model.
Expand Down Expand Up @@ -374,6 +375,36 @@ func (a *pancakeTransformer) createTopHitAndTopMetricsPancakes(pancake *pancakeM
return
}

func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLayer, whereClause model.Expr) {
for _, layer := range layers {
if layer.nextBucketAggregation != nil {
if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok {
timestampLowerBound, ok := model.FindTimestampLowerBound(whereClause)
if !ok {
logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound for auto_date_histogram %v", autoDateHistogram)
continue
}
if autoDateHistogram.GetField() != timestampLowerBound.Left {
logger.WarnWithCtx(a.ctx).Msgf("auto_date_histogram field %s does not match timestamp lower bound %s", autoDateHistogram.GetField(), timestampLowerBound.Left)
continue
}
var b any
if fun, ok := timestampLowerBound.Right.(model.FunctionExpr); ok && len(fun.Args) == 1 {
if s, ok := fun.Args[0].(model.LiteralExpr); ok {
b = s.Value
}
}
x, err := time.Parse("2006-01-02T15:04:05.000Z", b.(string)[1:len(b.(string))-1])
if err != nil {
//fmt.Println(err)
continue
}
autoDateHistogram.SetKey(x.UnixMilli())
}
}
}
}

func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregationTree) (pancakeResults []*pancakeModel, err error) {
if len(topLevel.children) == 0 {
return nil, fmt.Errorf("no top level aggregations found")
Expand All @@ -398,6 +429,7 @@ func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregati
}

a.connectPipelineAggregations(layers)
a.transformAutoDateHistogram(layers, topLevel.whereClause)

newPancake := pancakeModel{
layers: layers,
Expand Down
138 changes: 138 additions & 0 deletions quesma/testdata/clients/clover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package clients

import (
"quesma/model"
"quesma/testdata"
)

var CloverTests = []testdata.AggregationTestCase{
{ // [0]
TestName: "simplest auto_date_histogram",
QueryRequestJson: `
{
"aggs": {
"timeseries": {
"aggs": {
"469ef7fe-5927-42d1-918b-37c738c600f0": {
"bucket_script": {
"buckets_path": {
"count": "_count"
},
"gap_policy": "skip",
"script": {
"lang": "expression",
"source": "count * 1"
}
}
}
},
"auto_date_histogram": {
"buckets": 1,
"field": "timestamp"
},
"meta": {
"dataViewId": "d3d7af60-4c81-11e8-b3d7-01146121b73d",
"indexPatternString": "kibana_sample_data_flights",
"intervalString": "54000000ms",
"normalized": true,
"panelId": "1a1d745d-0c21-4103-a2ae-df41d4fbd366",
"seriesId": "866fb08f-b9a4-43eb-a400-38ebb6c13aed",
"timeField": "timestamp"
}
}
},
"query": {
"bool": {
"filter": [],
"must": [
{
"range": {
"timestamp": {
"format": "strict_date_optional_time",
"gte": "2024-10-10T17:33:47.125Z",
"lte": "2024-10-11T08:33:47.125Z"
}
}
}
],
"must_not": [],
"should": []
}
},
"runtime_mappings": {
"hour_of_day": {
"script": {
"source": "emit(doc['timestamp'].value.getHour());"
},
"type": "long"
}
},
"size": 0,
"timeout": "30000ms",
"track_total_hits": true
}`,
ExpectedResponse: `
{
"completion_time_in_millis": 1728635627258,
"expiration_time_in_millis": 1728635687254,
"id": "FlhaTzBhMkpQU3lLMmlzNHhBeU9FMHcbaUp3ZGNYdDNSaGF3STVFZ2xWY3RuQTo2MzU4",
"is_partial": false,
"is_running": false,
"response": {
"_shards": {
"failed": 0,
"skipped": 0,
"successful": 1,
"total": 1
},
"aggregations": {
"timeseries": {
"buckets": [
{
"469ef7fe-5927-42d1-918b-37c738c600f0": {
"value": 202.0
},
"doc_count": 202,
"key": 1728518400000,
"key_as_string": "2024-10-10T00:00:00.000Z"
}
],
"interval": "7d",
"meta": {
"dataViewId": "d3d7af60-4c81-11e8-b3d7-01146121b73d",
"indexPatternString": "kibana_sample_data_flights",
"intervalString": "54000000ms",
"normalized": true,
"panelId": "1a1d745d-0c21-4103-a2ae-df41d4fbd366",
"seriesId": "866fb08f-b9a4-43eb-a400-38ebb6c13aed",
"timeField": "timestamp"
}
}
},
"hits": {
"hits": [],
"max_score": null,
"total": {
"relation": "eq",
"value": 202
}
},
"timed_out": false,
"took": 4
},
"start_time_in_millis": 1728635627254
}`,
ExpectedPancakeResults: []model.QueryResultRow{
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__timeseries__count", int64(202)),
}},
},
ExpectedPancakeSQL: `
SELECT count(*) AS "aggr__timeseries__count"
FROM __quesma_table_name
WHERE ("timestamp">=parseDateTime64BestEffort('2024-10-10T17:33:47.125Z') AND
"timestamp"<=parseDateTime64BestEffort('2024-10-11T08:33:47.125Z'))`,
},
}

0 comments on commit 7c32520

Please sign in to comment.