Skip to content

Commit

Permalink
Add optimization pipeline (#521)
Browse files Browse the repository at this point in the history
Let's introduce the SQL query optimization pipeline.

Now there are only two optimizations supported: 
- truncate dates to 5 minutes, it will limit how many distinct queries
we perform
- cache aggregation queries ("GROUP BY"), these queries can be slow, and
produce a low number of rows,

Optimizations are disabled for tests. 
Currently, performed optimizations are not visible in the "Live View"
tab. I will add it in the next PR.

---------

Signed-off-by: Rafał Strzaliński <[email protected]>
Co-authored-by: Grzegorz Piwowarek <[email protected]>
Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2024
1 parent dbc1d30 commit 20d4a1e
Show file tree
Hide file tree
Showing 9 changed files with 594 additions and 3 deletions.
14 changes: 12 additions & 2 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (lm *LogManager) ProcessQuery(ctx context.Context, table *Table, query *mod

}

rows, err := executeQuery(ctx, lm, query.SelectCommand.String(), columns, rowToScan)
rows, err := executeQuery(ctx, lm, query, columns, rowToScan)

if err == nil {
for _, row := range rows {
Expand Down Expand Up @@ -121,9 +121,11 @@ func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed ti
}
}

func executeQuery(ctx context.Context, lm *LogManager, queryAsString string, fields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) {
func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) ([]model.QueryResultRow, error) {
span := lm.phoneHomeAgent.ClickHouseQueryDuration().Begin()

queryAsString := query.SelectCommand.String()

// We drop privileges for the query
//
// https://clickhouse.com/docs/en/operations/settings/permissions-for-queries
Expand All @@ -133,6 +135,14 @@ func executeQuery(ctx context.Context, lm *LogManager, queryAsString string, fie
settings["readonly"] = "1"
settings["allow_ddl"] = "0"

if query.OptimizeHints != nil {
for k, v := range query.OptimizeHints.Settings {
settings[k] = v
}

queryAsString = queryAsString + "\n-- optimizations: " + strings.Join(query.OptimizeHints.OptimizationsPerformed, ", ") + "\n"
}

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings))

rows, err := lm.Query(ctx, queryAsString)
Expand Down
12 changes: 12 additions & 0 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,18 @@ const (
noLimit = 0
)

// QueryOptimizeHints contains hints for query execution, e.g., performance settings, temporary table usage
type QueryOptimizeHints struct {
Settings map[string]any
OptimizationsPerformed []string
}

type (
Query struct {
SelectCommand SelectCommand // The representation of SELECT query

OptimizeHints *QueryOptimizeHints // it can be optional

Type QueryType
TableName string

Expand Down Expand Up @@ -44,6 +52,10 @@ type (
}
)

func NewQueryExecutionHints() *QueryOptimizeHints {
return &QueryOptimizeHints{Settings: make(map[string]any)}
}

func NewSortColumn(field string, direction OrderByDirection) OrderByExpr {
return NewOrderByExpr([]Expr{NewColumnRef(field)}, direction)
}
Expand Down
37 changes: 37 additions & 0 deletions quesma/optimize/cache_group_by.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package optimize

import "quesma/model"

// cacheGroupByQueries - a transformer that suggests db to cache the query results
//
// It's done by adding settings to the query
//
// https://clickhouse.com/docs/en/operations/query-cache
//
// Cached queries can be examined with:
//
// select * from system.query_cache
//
// Cache can be dropped with
//
// SYSTEM DROP QUERY CACHE
//

type cacheGroupByQueries struct {
}

func (s *cacheGroupByQueries) Transform(queries []*model.Query) ([]*model.Query, error) {

for _, query := range queries {

// TODO add better detection
// TODO add CTE here
if len(query.SelectCommand.GroupBy) > 0 {
query.OptimizeHints.Settings["use_query_cache"] = true
query.OptimizeHints.OptimizationsPerformed = append(query.OptimizeHints.OptimizationsPerformed, "cacheGroupByQueries")
}
}
return queries, nil
}
45 changes: 45 additions & 0 deletions quesma/optimize/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package optimize

import (
"quesma/model"
"quesma/plugins"
"time"
)

// OptimizePipeline - a transformer that optimizes queries
type OptimizePipeline struct {
optimizations []plugins.QueryTransformer
}

func NewOptimizePipeline() plugins.QueryTransformer {

return &OptimizePipeline{
optimizations: []plugins.QueryTransformer{
&truncateDate{truncateTo: 5 * time.Minute},
&cacheGroupByQueries{},
},
}
}

func (s *OptimizePipeline) Transform(queries []*model.Query) ([]*model.Query, error) {

// add hints if not present
for _, query := range queries {
if query.OptimizeHints == nil {
query.OptimizeHints = model.NewQueryExecutionHints()
}
}

// run optimizations on queries
for _, optimization := range s.optimizations {
var err error
queries, err = optimization.Transform(queries)
if err != nil {
return nil, err
}
}

return queries, nil
}
193 changes: 193 additions & 0 deletions quesma/optimize/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package optimize

import (
"fmt"
"github.com/stretchr/testify/assert"
"quesma/model"
"testing"
)

func Test_cacheGroupBy(t *testing.T) {

tests := []struct {
name string
shouldCache bool
query model.SelectCommand
}{
{
"select all",
false,
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
},
},

{
"select a, count() from foo group by 1",
true,
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("a"), model.NewFunction("count", model.NewColumnRef("*"))},
FromClause: model.NewTableRef("foo"),
GroupBy: []model.Expr{model.NewLiteral(1)},
},
},
// Add CTE here
}

for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {

queries := []*model.Query{
{
SelectCommand: tt.query,
},
}
pipeline := NewOptimizePipeline()
optimized, err := pipeline.Transform(queries)
if err != nil {
t.Fatalf("error optimizing query: %v", err)
}

if len(optimized) != 1 {
t.Fatalf("expected 1 query, got %d", len(optimized))
}

var enabled bool
if optimized[0].OptimizeHints.Settings["use_query_cache"] != nil {
enabled = optimized[0].OptimizeHints.Settings["use_query_cache"].(bool)
}

assert.Truef(t, enabled == tt.shouldCache, "expected use_query_cache to be %v, got %v", tt.shouldCache, enabled)

})

}
}

func Test_dateTrunc(t *testing.T) {

date := func(s string) model.Expr {
return model.NewFunction("parseDateTime64BestEffort", model.NewLiteral(fmt.Sprintf("'%s'", s)))
}

and := func(a, b model.Expr) model.Expr {
return model.NewInfixExpr(a, "and", b)
}

lt := func(a, b model.Expr) model.Expr {
return model.NewInfixExpr(a, "<", b)
}

gt := func(a, b model.Expr) model.Expr {
return model.NewInfixExpr(a, ">", b)
}

col := func(s string) model.Expr {
return model.NewColumnRef(s)
}

tests := []struct {
name string
query model.SelectCommand
expected model.SelectCommand
}{
{
"select all",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
},
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
},
},

{
"select all where date ",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
WhereClause: date("2024-06-04T13:08:53.675Z"),
},
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
WhereClause: date("2024-06-04T13:08:53.675Z"),
},
},

{
"select all where and between dates (>24h)",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
WhereClause: and(gt(col("a"), date("2024-06-04T13:08:53.675Z")), lt(col("a"), date("2024-06-06T13:10:53.675Z"))),
},
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
WhereClause: and(gt(col("a"), date("2024-06-04T13:05:00Z")), lt(col("a"), date("2024-06-06T13:15:00Z"))),
},
},

{
"select all where and between dates (<24h)",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
WhereClause: and(gt(col("a"), date("2024-06-06T10:08:53.675Z")), lt(col("a"), date("2024-06-06T13:10:53.675Z"))),
},
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
WhereClause: and(gt(col("a"), date("2024-06-06T10:08:53.675Z")), lt(col("a"), date("2024-06-06T13:10:53.675Z"))),
},
},

{
"select a, count() from foo group by 1",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("a"), model.NewFunction("count", model.NewColumnRef("*"))},
FromClause: model.NewTableRef("foo"),
GroupBy: []model.Expr{model.NewLiteral(1)},
},
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("a"), model.NewFunction("count", model.NewColumnRef("*"))},
FromClause: model.NewTableRef("foo"),
GroupBy: []model.Expr{model.NewLiteral(1)},
},
},
// Add CTE here
}

for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {

queries := []*model.Query{
{
SelectCommand: tt.query,
},
}
pipeline := NewOptimizePipeline()
optimized, err := pipeline.Transform(queries)

if err != nil {
t.Fatalf("error optimizing query: %v", err)
}

if len(optimized) != 1 {
t.Fatalf("expected 1 query, got %d", len(optimized))
}

assert.Equal(t, tt.expected, optimized[0].SelectCommand)

})

}
}
Loading

0 comments on commit 20d4a1e

Please sign in to comment.