Skip to content

Commit

Permalink
Merge branch 'main' into clover-filter-2nd-approach
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Oct 31, 2024
2 parents 65e115c + 7f5dd2b commit aa4d576
Show file tree
Hide file tree
Showing 9 changed files with 509 additions and 43 deletions.
5 changes: 5 additions & 0 deletions docs/public/docs/limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ Quesma has been tested with the following software versions:
| Docker | `24.0.7` |
| Elasticsearch/Kibana | `8.11.1` |
| ClickHouse | `24.1`, `23.12` |
| ClickHouse Cloud | `24.5` |
| OpenSearch/OpenSearch Dashboards | `2.12.0` |
| Hydrolix | `v4.8.12` |

### ClickHouse limitations
* When using a cluster deployment of ClickHouse, the tables automatically created by Quesma (during [Ingest](/ingest.md)) will use the `MergeTree` engine. If you wish to use the `ReplicatedMergeTree` engine instead, you will have to create the tables manually with `ReplicatedMergeTree` engine before ingesting data to Quesma.
* *Note: On ClickHouse Cloud, the tables automatically created by Quesma will use the `ReplicatedMergeTree` engine (ClickHouse Cloud default engine).*

## Functional limitations
Currently supported:
- front-end support for Kibana and Open Search Dashboards, limited to Discover(LogExplorer) interface and Dashboard panels
Expand Down
155 changes: 124 additions & 31 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
// map (it has the original key, doesn't know about the processed one)
// with date_histogram's map (it already has a "valid", processed key, after TranslateSqlResponseToJson)
OriginalKeyName = "__quesma_originalKey"
NoExtendedBound = int64(-1) // -1 and not e.g. 0, as 0 is a valid value
maxEmptyBucketsAdded = 1000
)

Expand All @@ -36,13 +37,15 @@ type DateHistogram struct {
interval string
timezone string
wantedTimezone *time.Location // key is in `timezone` time, and we need it to be UTC
extendedBoundsMin int64
extendedBoundsMax int64
minDocCount int
intervalType DateHistogramIntervalType
fieldDateTimeType clickhouse.DateTimeType
}

func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone string,
minDocCount int, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram {
func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone string, minDocCount int,
extendedBoundsMin, extendedBoundsMax int64, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram {

wantedTimezone, err := time.LoadLocation(timezone)
if err != nil {
Expand All @@ -51,7 +54,8 @@ func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone
}

return &DateHistogram{ctx: ctx, field: field, interval: interval, timezone: timezone, wantedTimezone: wantedTimezone,
minDocCount: minDocCount, intervalType: intervalType, fieldDateTimeType: fieldDateTimeType}
minDocCount: minDocCount, extendedBoundsMin: extendedBoundsMin, extendedBoundsMax: extendedBoundsMax,
intervalType: intervalType, fieldDateTimeType: fieldDateTimeType}
}

func (typ DateHistogramIntervalType) String(ctx context.Context) string {
Expand Down Expand Up @@ -83,7 +87,7 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
// Implement default when query.minDocCount == DefaultMinDocCount, we need to return
// all buckets between the first bucket that matches documents and the last one.

if query.minDocCount == 0 {
if query.minDocCount == 0 || query.extendedBoundsMin != NoExtendedBound || query.extendedBoundsMax != NoExtendedBound {
rows = query.NewRowsTransformer().Transform(query.ctx, rows)
}

Expand Down Expand Up @@ -208,25 +212,35 @@ func (query *DateHistogram) getKey(row model.QueryResultRow) int64 {
return row.Cols[len(row.Cols)-2].Value.(int64)
}

// originalKey is the key as it came from our SQL request (e.g. returned by query.getKey)
func (query *DateHistogram) calculateResponseKey(originalKey int64) int64 {
var key int64
func (query *DateHistogram) calculateResponseKeyInUTC(originalKey int64) int64 {
if query.intervalType == DateHistogramCalendarInterval {
key = originalKey
} else {
intervalInMilliseconds := query.intervalAsDuration().Milliseconds()
key = originalKey * intervalInMilliseconds
return originalKey
}
intervalInMilliseconds := query.intervalAsDuration().Milliseconds()
return originalKey * intervalInMilliseconds
}

ts := time.UnixMilli(key).UTC()
// originalKey is the key as it came from our SQL request (e.g. returned by query.getKey)
func (query *DateHistogram) calculateResponseKey(originalKey int64) int64 {
keyInUTC := query.calculateResponseKeyInUTC(originalKey)

ts := time.UnixMilli(keyInUTC)
intervalStartNotUTC := time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), ts.Minute(), ts.Second(), ts.Nanosecond(), query.wantedTimezone)

_, timezoneOffsetInSeconds := intervalStartNotUTC.Zone()
return key - int64(timezoneOffsetInSeconds*1000) // seconds -> milliseconds
return keyInUTC - int64(timezoneOffsetInSeconds*1000) // seconds -> milliseconds
}

func (query *DateHistogram) fromUTCToWantedTimezone(tsUTC int64) int64 {
dateUTC := time.UnixMilli(tsUTC)
date := time.Date(dateUTC.Year(), dateUTC.Month(), dateUTC.Day(), dateUTC.Hour(), dateUTC.Minute(), dateUTC.Second(), dateUTC.Nanosecond(), query.wantedTimezone)

_, timezoneOffsetInSeconds := date.Zone()
return tsUTC + int64(timezoneOffsetInSeconds*1000) // seconds -> milliseconds
}

func (query *DateHistogram) calculateKeyAsString(key int64) string {
return time.UnixMilli(key).UTC().Format("2006-01-02T15:04:05.000")
return time.UnixMilli(key).UTC().Format("2006-01-02T15:04:05.000") // TODO: check if this necessary Format("2006/01/02 15:04:05")
}

func (query *DateHistogram) OriginalKeyToKeyAsString(originalKey any) string {
Expand All @@ -239,35 +253,38 @@ func (query *DateHistogram) SetMinDocCountToZero() {
}

func (query *DateHistogram) NewRowsTransformer() model.QueryRowsTransformer {
differenceBetweenTwoNextKeys := int64(1)
if query.intervalType == DateHistogramCalendarInterval {
duration, err := kibana.ParseInterval(query.interval)
if err == nil {
differenceBetweenTwoNextKeys = duration.Milliseconds()
} else {
logger.ErrorWithCtx(query.ctx).Err(err)
differenceBetweenTwoNextKeys = 0
}
duration, err := kibana.ParseInterval(query.interval)
var differenceBetweenTwoNextKeys int64
if err == nil {
differenceBetweenTwoNextKeys = duration.Milliseconds()
} else {
// 0 is fine value for differenceBetweenTwoNextKeys, as it means we don't add keys
logger.ErrorWithCtx(query.ctx).Err(err)
}
return &DateHistogramRowsTransformer{MinDocCount: query.minDocCount, differenceBetweenTwoNextKeys: differenceBetweenTwoNextKeys, EmptyValue: 0}
return &DateHistogramRowsTransformer{dateHistogram: query, MinDocCount: query.minDocCount,
differenceBetweenTwoNextKeys: differenceBetweenTwoNextKeys, EmptyValue: 0,
extendedBoundsMin: query.extendedBoundsMin, extendedBoundsMax: query.extendedBoundsMax}
}

// we're sure len(row.Cols) >= 2

type DateHistogramRowsTransformer struct {
MinDocCount int
dateHistogram *DateHistogram
differenceBetweenTwoNextKeys int64 // if 0, we don't add keys
extendedBoundsMin int64 // simply copied from DateHistogram
extendedBoundsMax int64 // simply copied from DateHistogram
MinDocCount int
EmptyValue any
}

// if MinDocCount == 0, and we have buckets e.g. [key, value1], [key+10, value2], we need to insert [key+1, 0], [key+2, 0]...
// Also if extendedBounds are present, we need to add all keys between them.
// CAUTION: a different kind of postprocessing is needed for MinDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
if qt.MinDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 || len(rowsFromDB) < 2 {
if qt.MinDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 {
// we only add empty rows, when
// a) MinDocCount == 0
// b) we have valid differenceBetweenTwoNextKeys (>0)
// c) we have > 1 rows, with < 2 rows we can't add anything in between
return rowsFromDB
}
if qt.MinDocCount < 0 {
Expand All @@ -277,7 +294,11 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD

emptyRowsAdded := 0
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
if len(rowsFromDB) > 0 {
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
}

// add "mid" keys, so any needed key between [first_row_key, last_row_key]
for i := 1; i < len(rowsFromDB); i++ {
if len(rowsFromDB[i-1].Cols) < 2 || len(rowsFromDB[i].Cols) < 2 {
logger.ErrorWithCtx(ctx).Msgf(
Expand All @@ -286,17 +307,89 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD
i-1, rowsFromDB[i-1], i, rowsFromDB[i],
)
}
lastKey := qt.getKey(rowsFromDB[i-1])
currentKey := qt.getKey(rowsFromDB[i])
lastKey := qt.dateHistogram.calculateResponseKeyInUTC(qt.getKey(rowsFromDB[i-1]))
currentKey := qt.dateHistogram.calculateResponseKeyInUTC(qt.getKey(rowsFromDB[i]))

// ugly, but works, will do for now
doWeDivide := (currentKey/qt.getKey(rowsFromDB[i])) >= 100 || (float64(currentKey)/float64(qt.getKey(rowsFromDB[i]))) <= 0.01

for midKey := lastKey + qt.differenceBetweenTwoNextKeys; midKey < currentKey && emptyRowsAdded < maxEmptyBucketsAdded; midKey += qt.differenceBetweenTwoNextKeys {
midRow := rowsFromDB[i-1].Copy()
midRow.Cols[len(midRow.Cols)-2].Value = midKey
divideBy := int64(1)
if doWeDivide {
divideBy = qt.differenceBetweenTwoNextKeys
}
midRow.Cols[len(midRow.Cols)-2].Value = midKey / divideBy
midRow.Cols[len(midRow.Cols)-1].Value = qt.EmptyValue

postprocessedRows = append(postprocessedRows, midRow)
emptyRowsAdded++
}
postprocessedRows = append(postprocessedRows, rowsFromDB[i])
}

// some cases where we don't need to add anything more
switch {
case qt.extendedBoundsMax == NoExtendedBound && qt.extendedBoundsMin == NoExtendedBound:
case len(postprocessedRows) == 0 && (qt.extendedBoundsMax == NoExtendedBound || qt.extendedBoundsMin == NoExtendedBound):
return postprocessedRows

}
noBounds := qt.extendedBoundsMax == NoExtendedBound && qt.extendedBoundsMin == NoExtendedBound
noRowsAndNotFullyBounded := len(postprocessedRows) == 0 && (qt.extendedBoundsMax == NoExtendedBound || qt.extendedBoundsMin == NoExtendedBound)
if noBounds || noRowsAndNotFullyBounded {
return postprocessedRows
}

newRow := func(key int64) model.QueryResultRow {
var row model.QueryResultRow
if len(postprocessedRows) > 0 {
row = postprocessedRows[0].Copy()
row.Cols[len(row.Cols)-2].Value = key
row.Cols[len(row.Cols)-1].Value = qt.EmptyValue
} else {
row = model.QueryResultRow{
Cols: []model.QueryResultCol{
model.NewQueryResultCol("", key),
model.NewQueryResultCol("", qt.EmptyValue),
},
}
}
return row
}

// add "pre" keys, so any needed key between [extendedBoundsMin, first_row_key]
if qt.extendedBoundsMin != NoExtendedBound {
firstRequiredKey := (qt.dateHistogram.fromUTCToWantedTimezone(qt.extendedBoundsMin) + qt.differenceBetweenTwoNextKeys - 1) / qt.differenceBetweenTwoNextKeys
var lastRequiredKey int64
if len(postprocessedRows) > 0 {
lastRequiredKey = qt.getKey(postprocessedRows[0])
if qt.dateHistogram.intervalType == DateHistogramCalendarInterval {
lastRequiredKey /= qt.differenceBetweenTwoNextKeys
}
} else {
// we know qt.extendedBoundsMax != NoExtendedBound, because we would've returned earlier - line below is safe
lastRequiredKey = qt.dateHistogram.fromUTCToWantedTimezone(qt.extendedBoundsMax) / qt.differenceBetweenTwoNextKeys
}
preRows := make([]model.QueryResultRow, 0, max(0, int(lastRequiredKey-firstRequiredKey)))
for preKey := firstRequiredKey; preKey < lastRequiredKey && emptyRowsAdded < maxEmptyBucketsAdded; preKey++ {
preRows = append(preRows, newRow(preKey))
emptyRowsAdded++
}

postprocessedRows = append(preRows, postprocessedRows...)
}

// add "post" keys, so any needed key between [last_row_key, extendedBoundsMax]
if qt.extendedBoundsMax != NoExtendedBound {
firstRequiredKey := qt.dateHistogram.calculateResponseKeyInUTC(qt.getKey(postprocessedRows[len(postprocessedRows)-1]))/qt.differenceBetweenTwoNextKeys + 1
lastRequiredKey := qt.dateHistogram.fromUTCToWantedTimezone(qt.extendedBoundsMax) / qt.differenceBetweenTwoNextKeys
for postKey := firstRequiredKey; postKey <= lastRequiredKey && emptyRowsAdded < maxEmptyBucketsAdded; postKey++ {
postprocessedRows = append(postprocessedRows, newRow(postKey))
emptyRowsAdded++
}
}

return postprocessedRows
}

Expand Down
3 changes: 2 additions & 1 deletion quesma/model/bucket_aggregations/date_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestTranslateSqlResponseToJson(t *testing.T) {
{"key": int64(56962370) * 30_000, OriginalKeyName: int64(56962370), "doc_count": 14, "key_as_string": "2024-02-25T14:25:00.000"},
},
}
response := (&DateHistogram{interval: interval, intervalType: DateHistogramFixedInterval, wantedTimezone: time.UTC}).TranslateSqlResponseToJson(resultRows)
response := (&DateHistogram{interval: interval, extendedBoundsMax: NoExtendedBound, extendedBoundsMin: NoExtendedBound,
intervalType: DateHistogramFixedInterval, wantedTimezone: time.UTC}).TranslateSqlResponseToJson(resultRows)
assert.Equal(t, expectedResponse, response)
}
10 changes: 10 additions & 0 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ func (cw *ClickhouseQueryTranslator) parseIntField(queryMap QueryMap, fieldName
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseInt64Field(queryMap QueryMap, fieldName string, defaultValue int64) int64 {
if valueRaw, exists := queryMap[fieldName]; exists {
if asFloat, ok := valueRaw.(float64); ok {
return int64(asFloat)
}
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not an float64, but %T, value: %v. Using default: %d", fieldName, valueRaw, valueRaw, defaultValue)
}
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseFloatField(queryMap QueryMap, fieldName string, defaultValue float64) float64 {
if valueRaw, exists := queryMap[fieldName]; exists {
if asFloat, ok := valueRaw.(float64); ok {
Expand Down
9 changes: 7 additions & 2 deletions quesma/queryparser/pancake_aggregation_parser_buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,17 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
logger.ErrorWithCtx(cw.Ctx).Msgf("missing %v is not a string, but: %T. Skipping it.", missingRaw, missingRaw)
}
}

if !weAddedMissing {
// if we don't add missing, we need to filter out nulls later
aggregation.filterOutEmptyKeyBucket = true
}

ebMin, ebMax := bucket_aggregations.NoExtendedBound, bucket_aggregations.NoExtendedBound
if extendedBounds, exists := dateHistogram["extended_bounds"].(QueryMap); exists {
ebMin = cw.parseInt64Field(extendedBounds, "min", bucket_aggregations.NoExtendedBound)
ebMax = cw.parseInt64Field(extendedBounds, "max", bucket_aggregations.NoExtendedBound)
}

minDocCount := cw.parseMinDocCount(dateHistogram)
timezone := cw.parseStringField(dateHistogram, "time_zone", "")
interval, intervalType := cw.extractInterval(dateHistogram)
Expand All @@ -110,7 +115,7 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
}

dateHistogramAggr := bucket_aggregations.NewDateHistogram(
cw.Ctx, field, interval, timezone, minDocCount, intervalType, dateTimeType)
cw.Ctx, field, interval, timezone, minDocCount, ebMin, ebMax, intervalType, dateTimeType)
aggregation.queryType = dateHistogramAggr

sqlQuery := dateHistogramAggr.GenerateSQL()
Expand Down
17 changes: 11 additions & 6 deletions quesma/testdata/aggregation_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3537,8 +3537,8 @@ var AggregationTests = []AggregationTestCase{
"eventRate": {
"date_histogram": {
"extended_bounds": {
"max": 1709816694995,
"min": 1709815794995
"min": 1709816824995,
"max": 1709816834995
},
"field": "@timestamp",
"fixed_interval": "15000ms",
Expand Down Expand Up @@ -3600,14 +3600,19 @@ var AggregationTests = []AggregationTestCase{
"eventRate": {
"buckets": [
{
"doc_count": 0,
"doc_count": 2,
"key": 1709816790000,
"key_as_string": "2024-03-07T13:06:30.000"
},
{
"doc_count": 0,
"doc_count": 1,
"key": 1709816805000,
"key_as_string": "2024-03-07T13:06:45.000"
},
{
"doc_count": 0,
"key": 1709816820000,
"key_as_string": "2024-03-07T13:07:00.000"
}
]
},
Expand All @@ -3628,12 +3633,12 @@ var AggregationTests = []AggregationTestCase{
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__sampler__count", uint64(15)),
model.NewQueryResultCol("aggr__sampler__eventRate__key_0", int64(1709816790000/15000)),
model.NewQueryResultCol("aggr__sampler__eventRate__count", 0),
model.NewQueryResultCol("aggr__sampler__eventRate__count", 2),
}},
{Cols: []model.QueryResultCol{
model.NewQueryResultCol("aggr__sampler__count", uint64(15)),
model.NewQueryResultCol("aggr__sampler__eventRate__key_0", int64(1709816805000/15000)),
model.NewQueryResultCol("aggr__sampler__eventRate__count", 0),
model.NewQueryResultCol("aggr__sampler__eventRate__count", 1),
}},
},
ExpectedPancakeSQL: `
Expand Down
4 changes: 2 additions & 2 deletions quesma/testdata/clients/clover.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ var CloverTests = []testdata.AggregationTestCase{
},
"date_histogram": {
"extended_bounds": {
"max": 1726264800000,
"min": 1726264800000
"max": 1726264900000,
"min": 1726264900000
},
"field": "@timestamp",
"fixed_interval": "2592000s",
Expand Down
Loading

0 comments on commit aa4d576

Please sign in to comment.