Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
avelanarius committed Nov 5, 2024
1 parent 0bfe29b commit 69b408a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 223 deletions.
247 changes: 31 additions & 216 deletions quesma/table_resolver/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"quesma/quesma/config"
"quesma/util"
"reflect"
"slices"
"strings"
)

Expand Down Expand Up @@ -76,18 +75,15 @@ func singleIndexSplitter(pattern string) (parsedPattern, *Decision) {
}, nil
}

func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision {
func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline string) func(part string) *Decision {

return func(input parsedPattern) *Decision {

if !input.isPattern {
idx, ok := cfg[input.source]
if ok {
if len(getTargets(idx, pipeline)) == 0 {
return &Decision{
IsClosed: true,
Reason: "Index is disabled in config.",
}
return func(part string) *Decision {
idx, ok := cfg[part]
if ok {
if len(getTargets(idx, pipeline)) == 0 {
return &Decision{
IsClosed: true,
Reason: "Index is disabled in config.",
}
}
}
Expand All @@ -96,9 +92,9 @@ func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline s
}
}

func resolveInternalElasticName(pattern parsedPattern) *Decision {
func resolveInternalElasticName(part string) *Decision {

if elasticsearch.IsInternalIndex(pattern.source) {
if elasticsearch.IsInternalIndex(part) {
return &Decision{
UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{ManagementCall: true}},
Reason: "It's kibana internals",
Expand All @@ -108,8 +104,8 @@ func resolveInternalElasticName(pattern parsedPattern) *Decision {
return nil
}

func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) func(input parsedPattern) *Decision {
return func(input parsedPattern) *Decision {
func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) func(part string) *Decision {
return func(part string) *Decision {
var targets []string
var useConnectors []ConnectorDecision

Expand All @@ -129,9 +125,9 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string)
switch target {
case config.ClickhouseTarget:
useConnectors = append(useConnectors, &ConnectorDecisionClickhouse{
ClickhouseTableName: input.source,
ClickhouseTableName: part,
IsCommonTable: quesmaConf.UseCommonTableForWildcard,
ClickhouseTables: []string{input.source},
ClickhouseTables: []string{part},
})
case config.ElasticsearchTarget:
useConnectors = append(useConnectors, &ConnectorDecisionElastic{})
Expand All @@ -151,15 +147,10 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string)
}
}

func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision {

return func(input parsedPattern) *Decision {
func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfiguration, pipeline string) func(part string) *Decision {

if input.isPattern {
return nil
}

if cfg, ok := indexConfig[input.source]; ok {
return func(part string) *Decision {
if cfg, ok := indexConfig[part]; ok {
if !cfg.UseCommonTable {

targets := getTargets(cfg, pipeline)
Expand All @@ -182,8 +173,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi
targetDecision = &ConnectorDecisionElastic{}
case config.ClickhouseTarget:
targetDecision = &ConnectorDecisionClickhouse{
ClickhouseTableName: input.source,
ClickhouseTables: []string{input.source},
ClickhouseTableName: part,
ClickhouseTables: []string{part},
}
default:
return &Decision{
Expand All @@ -203,8 +194,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi
Reason: "Enabled in the config. Dual write is enabled.",

UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{
ClickhouseTableName: input.source,
ClickhouseTables: []string{input.source}},
ClickhouseTableName: part,
ClickhouseTables: []string{part}},
&ConnectorDecisionElastic{}},
}

Expand All @@ -216,8 +207,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi
Reason: "Enabled in the config. A/B testing.",
EnableABTesting: true,
UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{
ClickhouseTableName: input.source,
ClickhouseTables: []string{input.source}},
ClickhouseTableName: part,
ClickhouseTables: []string{part}},
&ConnectorDecisionElastic{}},
}
} else if targets[0] == config.ElasticsearchTarget && targets[1] == config.ClickhouseTarget {
Expand All @@ -228,8 +219,8 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi
UseConnectors: []ConnectorDecision{
&ConnectorDecisionElastic{},
&ConnectorDecisionClickhouse{
ClickhouseTableName: input.source,
ClickhouseTables: []string{input.source}},
ClickhouseTableName: part,
ClickhouseTables: []string{part}},
},
}

Expand Down Expand Up @@ -260,186 +251,10 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi
}
}

func (r *tableRegistryImpl) makeCheckIfPatternMatchesAllConnectors(pipeline string) func(input parsedPattern) *Decision {

return func(input parsedPattern) *Decision {
if input.isPattern {

var matchedElastic []string
var matchedClickhouse []string

for _, pattern := range input.parts {

// here we check against the config

for indexName, index := range r.conf.IndexConfig {
targets := getTargets(index, pipeline)

if util.IndexPatternMatches(pattern, indexName) {

for _, target := range targets {
switch target {
case config.ElasticsearchTarget:
matchedElastic = append(matchedElastic, indexName)
case config.ClickhouseTarget:
matchedClickhouse = append(matchedClickhouse, indexName)
default:
return &Decision{
Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("unsupported target: %s", target)),
Reason: "Unsupported target.",
}
}
}
}
}

// but maybe we should also check against the actual indexes ??
for indexName := range r.elasticIndexes {
if util.IndexPatternMatches(pattern, indexName) {
matchedElastic = append(matchedElastic, indexName)
}
}
if r.conf.AutodiscoveryEnabled {
for tableName := range r.clickhouseIndexes {
if util.IndexPatternMatches(pattern, tableName) {
matchedClickhouse = append(matchedClickhouse, tableName)
}
}
}

}

matchedElastic = util.Distinct(matchedElastic)
matchedClickhouse = util.Distinct(matchedClickhouse)

nElastic := len(matchedElastic)
nClickhouse := len(matchedClickhouse)

switch {

case nElastic > 0 && nClickhouse > 0:
return &Decision{
Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index pattern [%s] resolved to both elasticsearch indices: [%s] and clickhouse tables: [%s]", input.parts, matchedElastic, matchedClickhouse)),
Reason: "Both Elastic and Clickhouse matched.",
}

case nElastic > 0 && nClickhouse == 0:

return &Decision{
UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}},
Reason: "Only Elastic matched.",
}

case nElastic == 0 && nClickhouse > 0:
// it will be resolved by sth else later
return nil

case nElastic == 0 && nClickhouse == 0:
return &Decision{
IsEmpty: true,
Reason: "No indexes matched. Checked both connectors.",
}
}
}

return nil
}

}

func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision {

return func(input parsedPattern) *Decision {

if input.isPattern {

// At this point we should do not have any elastic indexes.
// This is because we have already checked if the pattern matches any elastic indexes.
for _, pattern := range input.parts {
for indexName := range r.elasticIndexes {
if util.IndexPatternMatches(pattern, indexName) {
return &Decision{
Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index parsedPattern [%s] resolved to elasticsearch indices", input.parts)),
Reason: "We're not supporting common tables for Elastic.",
}
}
}
}

var matchedVirtualTables []string
var matchedTables []string
for _, pattern := range input.parts {

// here we check against the config

for indexName, index := range r.conf.IndexConfig {
if util.IndexPatternMatches(pattern, indexName) {

targets := getTargets(index, pipeline)

if slices.Contains(targets, config.ClickhouseTarget) {
if index.UseCommonTable {
matchedVirtualTables = append(matchedVirtualTables, indexName)
} else {
matchedTables = append(matchedTables, indexName)
}
}
}
}

// but maybe we should also check against the actual indexes ??
if r.conf.AutodiscoveryEnabled {
for indexName, index := range r.clickhouseIndexes {
if util.IndexPatternMatches(pattern, indexName) {
if index.isVirtual {
matchedVirtualTables = append(matchedVirtualTables, indexName)
} else {
matchedTables = append(matchedTables, indexName)
}
}
}
}
}

matchedTables = util.Distinct(matchedTables)
matchedVirtualTables = util.Distinct(matchedVirtualTables)

switch {

case len(matchedTables) == 0 && len(matchedVirtualTables) == 0:
return &Decision{
IsEmpty: true,
Reason: "No indexes found.",
}

case len(matchedTables) == 1 && len(matchedVirtualTables) == 0:
return &Decision{
UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{
ClickhouseTableName: matchedTables[0],
ClickhouseTables: []string{matchedTables[0]},
}},
Reason: "Pattern matches single standalone table.",
}

case len(matchedTables) == 0 && len(matchedVirtualTables) > 0:
return &Decision{
UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{
IsCommonTable: true,
ClickhouseTableName: common_table.TableName,
ClickhouseTables: matchedVirtualTables,
}},
Reason: "Common table will be used. Querying multiple indexes.",
}

default:
return &Decision{
Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index pattern [%s] resolved to both standalone table indices: [%s] and common table indices: [%s]", input.source, matchedTables, matchedVirtualTables)),
Reason: "Both standalone table and common table indexes matches the pattern",
}
}
}
func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexConfiguration, pipeline string) func(part string) *Decision {

if input.source == common_table.TableName {
return func(part string) *Decision {
if part == common_table.TableName {
return &Decision{
Err: fmt.Errorf("common table is not allowed to be queried directly"),
Reason: "It's internal table. Not allowed to be queried directly.",
Expand All @@ -450,18 +265,18 @@ func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexC

if r.conf.AutodiscoveryEnabled {
for indexName, index := range r.clickhouseIndexes {
if index.isVirtual && indexName == input.source {
if index.isVirtual && indexName == part {
virtualTableExists = true
break
}
}
}

if idxConfig, ok := cfg[input.source]; (ok && idxConfig.UseCommonTable) || (virtualTableExists) {
if idxConfig, ok := cfg[part]; (ok && idxConfig.UseCommonTable) || (virtualTableExists) {
return &Decision{
UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{
ClickhouseTableName: common_table.TableName,
ClickhouseTables: []string{input.source},
ClickhouseTables: []string{part},
IsCommonTable: true,
}},
Reason: "Common table will be used.",
Expand Down
9 changes: 2 additions & 7 deletions quesma/table_resolver/table_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type patternSplitter struct {

type basicResolver struct {
name string
resolver func(pattern parsedPattern) *Decision
resolver func(part string) *Decision
}

type decisionMerger struct {
Expand All @@ -60,11 +60,7 @@ func (ir *compoundResolver) resolve(indexName string) *Decision {
var decisions []*Decision
for _, part := range input.parts {
for _, resolver := range ir.decisionLadder {
decision := resolver.resolver(parsedPattern{
source: part,
isPattern: false,
parts: []string{part},
})
decision := resolver.resolver(part)

if decision != nil {
decision.ResolverName = resolver.name
Expand Down Expand Up @@ -330,7 +326,6 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous
decisionLadder: []basicResolver{
// checking if we can handle the parsedPattern
{"kibanaInternal", resolveInternalElasticName},
{"searchAcrossConnectors", res.makeCheckIfPatternMatchesAllConnectors(QueryPipeline)},
{"disabled", makeIsDisabledInConfig(indexConf, QueryPipeline)},

{"singleIndex", res.singleIndex(indexConf, QueryPipeline)},
Expand Down

0 comments on commit 69b408a

Please sign in to comment.