Skip to content

Commit

Permalink
Merge branch 'main' into date-histogram-eb
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Oct 31, 2024
2 parents c8aea3d + 61339a9 commit 28570a5
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 53 deletions.
2 changes: 1 addition & 1 deletion quesma/model/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
Metrics aggregation | Support | Bucket aggregation | Support | Pipeline aggregation | Support |
---------------------------|:----------------------:|------------------------------|:------------------:|------------------------|:------------------:|
Avg | :white_check_mark: | Adjacency matrix | :x: | Average bucket | :white_check_mark: |
Cardinality | :white_check_mark: | Auto-interval date histogram | :x: | Bucket script | :x: |
Cardinality | :white_check_mark: | Auto-interval date histogram | :wavy_dash: | Bucket script | :wavy_dash: |
Extended Stats | :white_check_mark:[^1] | Categorize text | :x: | Bucket count K-S test | :x: |
Avg | :white_check_mark: | Children | :x: | Bucket correlation | :x: |
Boxplot | :x: | Composite | :x: | Bucket selector | :x: |
Expand Down
59 changes: 59 additions & 0 deletions quesma/model/bucket_aggregations/auto_date_histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"time"
)

// TODO: only bucketsNr=1 is supported for now. Implement other cases.
type AutoDateHistogram struct {
ctx context.Context
field model.ColumnRef // name of the field, e.g. timestamp
bucketsNr int
key int64
}

// NewAutoDateHistogram creates a new AutoDateHistogram aggregation, during parsing.
// Key is set later, during pancake transformation.
func NewAutoDateHistogram(ctx context.Context, field model.ColumnRef, bucketsNr int) *AutoDateHistogram {
return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr}
}

func (query *AutoDateHistogram) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String())
return make(model.JsonMap, 0)
}
if len(rows) != 1 {
logger.WarnWithCtx(query.ctx).Msgf("unexpected (!= 1) number of rows returned for %s: %d.", query.String(), len(rows))
}
return model.JsonMap{
"buckets": []model.JsonMap{{
"key": query.key,
"key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"),
"doc_count": rows[0].LastColValue(),
}},
"interval": "100y", // seems working for bucketsNr=1 case. Will have to be changed for other cases.
}
}

func (query *AutoDateHistogram) String() string {
return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr)
}

func (query *AutoDateHistogram) GetField() model.ColumnRef {
return query.field
}

func (query *AutoDateHistogram) SetKey(key int64) {
query.key = key
}
5 changes: 4 additions & 1 deletion quesma/model/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ type Expr interface {
Accept(v ExprVisitor) interface{}
}

var InvalidExpr = Expr(nil)
var (
InvalidExpr = Expr(nil)
TrueExpr = NewLiteral(true)
)

// ColumnRef is a reference to a column in a table, we can enrich it with more information (e.g. type used) as we go
type ColumnRef struct {
Expand Down
18 changes: 13 additions & 5 deletions quesma/model/pipeline_aggregations/bucket_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type BucketScript struct {
script string
}

func NewBucketScript(ctx context.Context, script string) BucketScript {
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, "_count")}
func NewBucketScript(ctx context.Context, path, script string) BucketScript {
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, path)}
}

func (query BucketScript) AggregationType() model.AggregationType {
Expand All @@ -28,8 +28,16 @@ func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow
const defaultValue = 0.
switch {
case query.script == "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0":
numerator := query.findFilterValue(rows, "numerator")
denominator := query.findFilterValue(rows, "denominator")
parent := query.GetPathToParent()
if len(parent) != 1 {
// TODO: research if this limitation can be removed, and do so if possible.
logger.WarnWithCtx(query.ctx).Msgf("unexpected parent path in bucket_script: %s. Returning default.", query.String())
return model.JsonMap{"value": defaultValue}
}

// replaceAll - hack but get the job done for the customer's case, and won't break anything in any other case.
numerator := query.findFilterValue(rows, strings.ReplaceAll(parent[0], "denominator", "numerator"))
denominator := query.findFilterValue(rows, strings.ReplaceAll(parent[0], "numerator", "denominator"))
if denominator == 0 {
return model.JsonMap{"value": defaultValue}
}
Expand Down Expand Up @@ -77,7 +85,7 @@ func (query BucketScript) findFilterValue(rows []model.QueryResultRow, filterNam
}
colName = strings.TrimSuffix(colName, "_col_0")
if strings.HasSuffix(colName, "-"+filterName) {
return float64(util.ExtractInt64(col.Value))
return util.ExtractNumeric64(col.Value)
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions quesma/model/where_visitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package model

import (
"math"
)

// FindLowerBounds returns y if there is "x>=y" or "x>y" in the WHERE clause, but only as a single top-level expression.
// (I mean by that a>=0 is fine, a>=0 AND expr2 [AND ...]] is also fine (AND between all), but a>=0 OR e2 is not fine.
// a>=0 AND (expr2 OR expr3) is also fine, as on top level it's only an AND.
// We achieve that by only descending for AND operators.
// If there are multiple such expressions, we return the smallest one.
//
// TODO: add upper bound here too, when bucket_nr=1 in auto_date_histogram (only use case of this function), it's not needed.
func FindTimestampLowerBound(field ColumnRef, whereClause Expr) (timestamp int64, found bool) {
timestamp = math.MaxInt64
visitor := NewBaseVisitor()
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
if columnRef, ok := e.Left.(ColumnRef); ok && columnRef == field && e.Op == ">=" || e.Op == ">" {
if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp64Milli" && len(fun.Args) == 1 {
if rhs, ok := fun.Args[0].(LiteralExpr); ok {
if rhsInt64, ok := rhs.Value.(int64); ok {
timestamp = min(timestamp, rhsInt64)
found = true
}
}
}
} else if e.Op == "AND" {
e.Left.Accept(visitor)
e.Right.Accept(visitor)
}
return nil
}

whereClause.Accept(visitor)
return
}
14 changes: 10 additions & 4 deletions quesma/queryparser/filters_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package queryparser

import (
"quesma/logger"
"quesma/model"
"quesma/model/bucket_aggregations"
)

Expand Down Expand Up @@ -32,13 +33,18 @@ func (cw *ClickhouseQueryTranslator) parseFilters(queryMap QueryMap) (success bo
}

filters := make([]bucket_aggregations.Filter, 0, len(nestedMap))
for name, filter := range nestedMap {
filterMap, ok := filter.(QueryMap)
for name, filterRaw := range nestedMap {
filterMap, ok := filterRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filter, filter)
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filterRaw, filterRaw)
continue
}
filters = append(filters, bucket_aggregations.NewFilter(name, cw.parseQueryMap(filterMap)))
filter := cw.parseQueryMap(filterMap)
if filter.WhereClause == nil {
filter.WhereClause = model.TrueExpr
filter.CanParse = true
}
filters = append(filters, bucket_aggregations.NewFilter(name, filter))
}
return true, bucket_aggregations.NewFilters(cw.Ctx, filters)
}
3 changes: 3 additions & 0 deletions quesma/queryparser/pancake_aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ func (cw *ClickhouseQueryTranslator) pancakeParseAggregation(aggregationName str
if filterRaw, ok := queryMap["filter"]; ok {
if filter, ok := filterRaw.(QueryMap); ok {
whereClause := cw.parseQueryMap(filter).WhereClause
if whereClause == nil { // empty filter <=> true
whereClause = model.TrueExpr
}
aggregation.queryType = bucket_aggregations.NewFilterAgg(cw.Ctx, whereClause)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping", filterRaw, filterRaw)
Expand Down
32 changes: 23 additions & 9 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
delete(queryMap, "date_histogram")
return success, nil
}
if autoDateHistogram := cw.parseAutoDateHistogram(queryMap["auto_date_histogram"]); autoDateHistogram != nil {
aggregation.queryType = autoDateHistogram
delete(queryMap, "auto_date_histogram")
return
}
for _, termsType := range []string{"terms", "significant_terms"} {
termsRaw, ok := queryMap[termsType]
if !ok {
Expand All @@ -141,15 +146,8 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
aggregation.filterOutEmptyKeyBucket = true
}

size := 10
if sizeRaw, ok := terms["size"]; ok {
if sizeParsed, ok := sizeRaw.(float64); ok {
size = int(sizeParsed)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw)
}
}

const defaultSize = 10
size := cw.parseSize(terms, defaultSize)
orderBy := cw.parseOrder(terms, queryMap, []model.Expr{fieldExpression})
aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", orderBy[0]) // TODO probably full, not [0]
aggregation.selectedColumns = append(aggregation.selectedColumns, fieldExpression)
Expand Down Expand Up @@ -351,6 +349,22 @@ func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bu
)
}

func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *bucket_aggregations.AutoDateHistogram {
params, ok := paramsRaw.(QueryMap)
if !ok {
return nil
}

fieldRaw := cw.parseFieldField(params, "auto_date_histogram")
var field model.ColumnRef
if field, ok = fieldRaw.(model.ColumnRef); !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v. Skipping auto_date_histogram", fieldRaw, fieldRaw)
return nil
}
bucketsNr := cw.parseIntField(params, "buckets", 10)
return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr)
}

func (cw *ClickhouseQueryTranslator) parseOrder(terms, queryMap QueryMap, fieldExpressions []model.Expr) []model.OrderByExpr {
defaultDirection := model.DescOrder
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)
Expand Down
36 changes: 27 additions & 9 deletions quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
default:
metricRows = p.selectMetricRows(metric.InternalNamePrefix(), rows)
}
result[metric.name] = metric.queryType.TranslateSqlResponseToJson(metricRows)
if metric.name != PancakeTotalCountMetricName {
result[metric.name] = metric.queryType.TranslateSqlResponseToJson(metricRows)
}
// TODO: maybe add metadata also here? probably not needed
}

Expand All @@ -253,6 +255,9 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
if err != nil {
return nil, err
}
if layer.nextBucketAggregation.metadata != nil {
json["meta"] = layer.nextBucketAggregation.metadata
}
result[layer.nextBucketAggregation.name] = json
return result, nil
}
Expand Down Expand Up @@ -345,18 +350,30 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
return nil, fmt.Errorf("no key in bucket json, layer: %s", layer.nextBucketAggregation.name)
}
}
var (
columnNameWithKey = layer.nextBucketAggregation.InternalNameForKey(0) // TODO: need all ids, multi_terms will probably not work now
found bool
subAggrKey any
currentBucketSubAggrRows []model.QueryResultRow
)
if subAggrIdx < len(subAggrRows) {
subAggrKey, found = p.valueForColumn(subAggrRows[subAggrIdx], columnNameWithKey)
}

columnNameWithKey := layer.nextBucketAggregation.InternalNameForKey(0) // TODO: need all ids, multi_terms will probably not work now
subAggrKey, found := p.valueForColumn(subAggrRows[subAggrIdx], columnNameWithKey)
if found && subAggrKey == key {
subAggr, err := p.layerToJSON(remainingLayers[1:], subAggrRows[subAggrIdx])
if err != nil {
return nil, err
}
bucketArr[i] = util.MergeMaps(p.ctx, bucket, subAggr)
currentBucketSubAggrRows = subAggrRows[subAggrIdx]
subAggrIdx++
} else {
bucketArr[i] = bucket
currentBucketSubAggrRows = []model.QueryResultRow{}
}

subAggr, err := p.layerToJSON(remainingLayers[1:], currentBucketSubAggrRows)
if err != nil {
return nil, err
}
bucketArr[i] = util.MergeMaps(p.ctx, bucket, subAggr)
if _, exists = bucketArr[i][bucket_aggregations.OriginalKeyName]; exists {
delete(bucketArr[i], bucket_aggregations.OriginalKeyName)
}
}
}
Expand All @@ -367,6 +384,7 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
}
result[layer.nextBucketAggregation.name] = buckets
}

return result, nil
}

Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/pancake_sql_query_generation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestPancakeQueryGeneration(t *testing.T) {
}

if test.TestName == "multiple buckets_path(file:clients/clover,nr:1)" {
t.Skip("Unskip after merge of auto_date_histogram")
t.Skip("This needs fixing ASAP, easy to fix")
}

fmt.Println("i:", i, "test:", test.TestName)
Expand Down
18 changes: 18 additions & 0 deletions quesma/queryparser/pancake_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,23 @@ func (a *pancakeTransformer) createTopHitAndTopMetricsPancakes(pancake *pancakeM
return
}

// Auto date histogram is a date histogram, that automatically creates buckets based on time range.
// To do that we need parse WHERE clause which happens in this method.
func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLayer, whereClause model.Expr) {
for _, layer := range layers {
if layer.nextBucketAggregation != nil {
if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok {
if tsLowerBound, found := model.FindTimestampLowerBound(autoDateHistogram.GetField(), whereClause); found {
autoDateHistogram.SetKey(tsLowerBound)
} else {
logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound (field: %v, where clause: %v)",
autoDateHistogram.GetField(), whereClause)
}
}
}
}
}

func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregationTree) (pancakeResults []*pancakeModel, err error) {
if len(topLevel.children) == 0 {
return nil, fmt.Errorf("no top level aggregations found")
Expand All @@ -398,6 +415,7 @@ func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregati
}

a.connectPipelineAggregations(layers)
a.transformAutoDateHistogram(layers, topLevel.whereClause)

newPancake := pancakeModel{
layers: layers,
Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
return
}
if script, ok := scriptRaw.(string); ok {
return pipeline_aggregations.NewBucketScript(cw.Ctx, script), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, script), true
}

script, ok := scriptRaw.(QueryMap)
Expand All @@ -204,7 +204,7 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
}

// okay, we've checked everything, it's indeed a simple count
return pipeline_aggregations.NewBucketScript(cw.Ctx, ""), true
return pipeline_aggregations.NewBucketScript(cw.Ctx, bucketsPath, ""), true
}

func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPathStr string, success bool) {
Expand Down
Loading

0 comments on commit 28570a5

Please sign in to comment.