Skip to content

Commit

Permalink
POC N index -> 1 table mapping (#532)
Browse files Browse the repository at this point in the history
This PR adds new configuration settings to map input/source indexes to
internal database table (that's proposal, I'm open to suggestions)
```
indexMappings:
  big_kibana_common_table:
    sourceIndexes: ["kibana_sample_data_flights", "kibana_sample_data_logs"]
```
It then uses this information to rewrite all table references to
specified common database table. Rewriting table references in from
clause should be sufficient for our current needs
  • Loading branch information
pdelewski authored Jul 17, 2024
1 parent 7b84d19 commit b2f3ea1
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 9 deletions.
27 changes: 19 additions & 8 deletions quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ type QuesmaConfiguration struct {
//deprecated
ClickHouse RelationalDbConfiguration `koanf:"clickhouse"`
//deprecated
Hydrolix RelationalDbConfiguration `koanf:"hydrolix"`
Elasticsearch ElasticsearchConfiguration `koanf:"elasticsearch"`
IndexConfig map[string]IndexConfiguration `koanf:"indexes"`
Logging LoggingConfiguration `koanf:"logging"`
PublicTcpPort network.Port `koanf:"port"`
IngestStatistics bool `koanf:"ingestStatistics"`
QuesmaInternalTelemetryUrl *Url `koanf:"internalTelemetryUrl"`
EnabledOptimizers OptimizersConfiguration `koanf:"optimizers"`
Hydrolix RelationalDbConfiguration `koanf:"hydrolix"`
Elasticsearch ElasticsearchConfiguration `koanf:"elasticsearch"`
IndexConfig map[string]IndexConfiguration `koanf:"indexes"`
Logging LoggingConfiguration `koanf:"logging"`
PublicTcpPort network.Port `koanf:"port"`
IngestStatistics bool `koanf:"ingestStatistics"`
QuesmaInternalTelemetryUrl *Url `koanf:"internalTelemetryUrl"`
EnabledOptimizers OptimizersConfiguration `koanf:"optimizers"`
IndexSourceToInternalMappings map[string]IndexMappingsConfiguration `koanf:"indexMappings"`
}

type LoggingConfiguration struct {
Expand Down Expand Up @@ -129,6 +130,10 @@ func Load() QuesmaConfiguration {
}
}
}
for name, idxMapping := range config.IndexSourceToInternalMappings {
idxMapping.Name = name
config.IndexSourceToInternalMappings[name] = idxMapping
}
return config
}

Expand Down Expand Up @@ -318,6 +323,10 @@ func (c *QuesmaConfiguration) String() string {
if c.QuesmaInternalTelemetryUrl != nil {
quesmaInternalTelemetryUrl = c.QuesmaInternalTelemetryUrl.String()
}
var indexMappings string
for _, idx := range c.IndexSourceToInternalMappings {
indexMappings += idx.String()
}
return fmt.Sprintf(`
Quesma Configuration:
Mode: %s
Expand All @@ -326,6 +335,7 @@ Quesma Configuration:
Connectors: %s
Call Elasticsearch: %v
Indexes: %s
IndexMappings: %s
Logs Path: %s
Log Level: %v
Public TCP Port: %d
Expand All @@ -340,6 +350,7 @@ Quesma Configuration:
connectorString.String(),
c.Elasticsearch.Call,
indexConfigs,
indexMappings,
c.Logging.Path,
c.Logging.Level,
c.PublicTcpPort,
Expand Down
24 changes: 24 additions & 0 deletions quesma/quesma/config/index_mappings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package config

import (
"fmt"
"strings"
)

type IndexMappingsConfiguration struct {
Name string `koanf:"name"`
Mappings []string `koanf:"sourceIndexes"`
}

func (imc IndexMappingsConfiguration) String() string {
var str = fmt.Sprintf("\n\t\t%s",
imc.Name,
)
if len(imc.Mappings) > 0 {
str = fmt.Sprintf("%s <- %s", str, strings.Join(imc.Mappings, ", "))
}
return str
}
16 changes: 16 additions & 0 deletions quesma/quesma/functionality/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ type (
}
)

func makeSourceToDestMappings(indexMappings map[string]config.IndexMappingsConfiguration) map[string]string {
sourceToDestMapping := make(map[string]string)
for _, indexMapping := range indexMappings {
for _, sourceIndex := range indexMapping.Mappings {
destIndex := indexMapping.Name
sourceToDestMapping[sourceIndex] = destIndex
}
}
return sourceToDestMapping
}

func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *clickhouse.LogManager,
cfg config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (results []WriteResult) {
defer recovery.LogPanic()
Expand Down Expand Up @@ -88,6 +99,11 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *cli
for _, document := range documents {
stats.GlobalStatistics.Process(cfg, indexName, document, clickhouse.NestedSeparator)
}
// if the index is mapped to specified database table in the configuration, use that table
sourceToDestinations := makeSourceToDestMappings(cfg.IndexSourceToInternalMappings)
if destIndex, ok := sourceToDestinations[indexName]; ok {
return lm.ProcessInsertQuery(ctx, destIndex, documents)
}
return lm.ProcessInsertQuery(ctx, indexName, documents)
})
}
Expand Down
43 changes: 43 additions & 0 deletions quesma/quesma/index_mapping_query_rewriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package quesma

import (
"quesma/model"
"quesma/quesma/config"
)

func makeSourceToDestMappings(indexMappings map[string]config.IndexMappingsConfiguration) map[string]string {
sourceToDestMapping := make(map[string]string)
for _, indexMapping := range indexMappings {
for _, sourceIndex := range indexMapping.Mappings {
destIndex := indexMapping.Name
sourceToDestMapping[sourceIndex] = destIndex
}
}
return sourceToDestMapping
}

func (s *SchemaCheckPass) applyIndexMappingTransformations(query *model.Query) (*model.Query, error) {
sourceToDestMapping := makeSourceToDestMappings(s.indexMappings)

visitor := model.NewBaseVisitor()

// For now, we only rewrite the table refs
// as it seems to be sufficient for the current use case
// as there will be only one table ref in the query
// we don't need to worry about the other expressions
visitor.OverrideVisitTableRef = func(b *model.BaseExprVisitor, e model.TableRef) interface{} {
if destIndex, ok := sourceToDestMapping[e.Name]; ok {
return model.NewTableRef(destIndex)
}
return model.NewTableRef(e.Name)
}
expr := query.SelectCommand.Accept(visitor)
if _, ok := expr.(*model.SelectCommand); ok {
query.SelectCommand = *expr.(*model.SelectCommand)
}
return query, nil

}
86 changes: 86 additions & 0 deletions quesma/quesma/index_mapping_query_rewriter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma

import (
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
"quesma/model"
"quesma/quesma/config"
"quesma/schema"
"testing"
)

func Test_index_table_mapping(t *testing.T) {
// Below mapping says that two indexes are mapped to a single db table called
// CommonKibanaTable, so queries from clauses should be rewritten to use
// CommonKibanaTable
mappingsConfig := map[string]config.IndexMappingsConfiguration{
"CommonKibanaTable": {
Name: "CommonKibanaTable",
Mappings: []string{"kibana_sample_data_logs", "kibana_sample_data_flights"},
},
}

// expectedQueries expects to have CommonKibanaTable as the table name
expectedQueries := []*model.Query{
{
TableName: "kibana_sample_data_logs",
SelectCommand: model.SelectCommand{
FromClause: model.NewTableRef("CommonKibanaTable"),
},
},

{
TableName: "kibana_sample_data_flights",
SelectCommand: model.SelectCommand{
FromClause: model.NewTableRef("CommonKibanaTable"),
},
},
}

// input queries always use the original index names
queries := [][]*model.Query{
{
{
TableName: "kibana_sample_data_logs",
SelectCommand: model.SelectCommand{
FromClause: model.NewTableRef("kibana_sample_data_logs"),
}},
},

{
{
TableName: "kibana_sample_data_flights",
SelectCommand: model.SelectCommand{
FromClause: model.NewTableRef("kibana_sample_data_flights"),
}},
},
}

indexConfig := map[string]config.IndexConfiguration{
"kibana_sample_data_logs": {
Name: "kibana_sample_data_logs",
Enabled: true,
},
}

cfg := config.QuesmaConfiguration{
IndexConfig: indexConfig,
}

tableDiscovery :=
fixedTableProvider{tables: map[string]schema.Table{
"kibana_sample_data_flights": {Columns: map[string]schema.Column{}},
}}

s := schema.NewSchemaRegistry(tableDiscovery, cfg, clickhouse.SchemaTypeAdapter{})

transform := &SchemaCheckPass{cfg: indexConfig, schemaRegistry: s, logManager: clickhouse.NewLogManagerEmpty(), indexMappings: mappingsConfig}

for k := range queries {
resultQueries, err := transform.Transform(queries[k])
assert.NoError(t, err)
assert.Equal(t, expectedQueries[k].SelectCommand.String(), resultQueries[0].SelectCommand.String())
}
}
2 changes: 2 additions & 0 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type SchemaCheckPass struct {
cfg map[string]config.IndexConfiguration
schemaRegistry schema.Registry
logManager *clickhouse.LogManager
indexMappings map[string]config.IndexMappingsConfiguration
}

// This functions trims the db name from the table name if exists
Expand Down Expand Up @@ -272,6 +273,7 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
TransformationName string
Transformation func(*model.Query) (*model.Query, error)
}{
{TransformationName: "IndexMappingQueryRewriter", Transformation: s.applyIndexMappingTransformations},
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
{TransformationName: "IpTransformation", Transformation: s.applyIpTransformations},
{TransformationName: "GeoTransformation", Transformation: s.applyGeoTransformations},
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewQueryRunner(lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, i
AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext](),
transformationPipeline: TransformationPipeline{
transformers: []plugins.QueryTransformer{
&SchemaCheckPass{cfg: cfg.IndexConfig, schemaRegistry: schemaRegistry, logManager: lm}, // this can be a part of another plugin
&SchemaCheckPass{cfg: cfg.IndexConfig, schemaRegistry: schemaRegistry, logManager: lm, indexMappings: cfg.IndexSourceToInternalMappings}, // this can be a part of another plugin
},
}, schemaRegistry: schemaRegistry}
}
Expand Down

0 comments on commit b2f3ea1

Please sign in to comment.