Skip to content

Commit

Permalink
Index <-> tables mappings : fixing field caps and search index resolv…
Browse files Browse the repository at this point in the history
…ing (#544)

This PR implements some missing pieces to connect index <-> tables
mappings e2e like field caps.

According configuration all request for `kibana_sample_data_flights` go
to `big_kibana_common_table`
```
indexMappings:
  big_kibana_common_table:
    sourceIndexes: ["kibana_sample_data_flights"]
```
<img width="1175" alt="image"
src="https://github.com/user-attachments/assets/ff400027-24de-47f7-a64d-d3192e40027f">


<img width="487" alt="image"
src="https://github.com/user-attachments/assets/a9d859a2-3623-4bdc-97f5-569d3c98506b">
  • Loading branch information
pdelewski authored Jul 19, 2024
1 parent 393c3ab commit f091838
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 15 deletions.
2 changes: 1 addition & 1 deletion quesma/eql/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (cw *ClickhouseEQLQueryTranslator) ParseQuery(body types.JSON) ([]*model.Qu
if simpleQuery.CanParse {
canParse = true
query = query_util.BuildHitsQuery(cw.Ctx, cw.Table.Name, "*", &simpleQuery, queryInfo.I2)
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, query.SelectCommand.OrderByFieldNames(), true, false, false)
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, query.SelectCommand.OrderByFieldNames(), true, false, false, cw.Table.Name)
query.Type = &queryType
query.Highlighter = highlighter
query.SelectCommand.OrderBy = simpleQuery.OrderBy
Expand Down
9 changes: 5 additions & 4 deletions quesma/model/typical_queries/hits.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ type Hits struct {
addSource bool // true <=> we add hit.Source field to the response
addScore bool // true <=> we add hit.Score field to the response (whose value is always 1)
addVersion bool // true <=> we add hit.Version field to the response (whose value is always 1)
indexName string
}

func NewHits(ctx context.Context, table *clickhouse.Table, highlighter *model.Highlighter,
sortFieldNames []string, addSource, addScore, addVersion bool) Hits {
sortFieldNames []string, addSource, addScore, addVersion bool, incomingIndexName string) Hits {

return Hits{ctx: ctx, table: table, highlighter: highlighter, sortFieldNames: sortFieldNames,
addSource: addSource, addScore: addScore, addVersion: addVersion}
addSource: addSource, addScore: addScore, addVersion: addVersion, indexName: incomingIndexName}
}

const (
Expand All @@ -48,7 +49,7 @@ func (query Hits) IsBucketAggregation() bool {
func (query Hits) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
hits := make([]model.SearchHit, 0, len(rows))
for i, row := range rows {
hit := model.NewSearchHit(query.table.Name)
hit := model.NewSearchHit(query.indexName)
if query.addScore {
hit.Score = defaultScore
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func (query Hits) computeIdForDocument(doc model.SearchHit, defaultID string) st
}

func (query Hits) String() string {
return fmt.Sprintf("hits(table: %v)", query.table.Name)
return fmt.Sprintf("hits(table: %v)", query.indexName)
}

func (query Hits) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (cw *ClickhouseQueryTranslator) buildListQueryIfNeeded(
if fullQuery != nil {
highlighter.SetTokensToHighlight(fullQuery.SelectCommand)
// TODO: pass right arguments
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, fullQuery.SelectCommand.OrderByFieldNames(), true, false, false)
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, fullQuery.SelectCommand.OrderByFieldNames(), true, false, false, cw.IncomingIndexName)
fullQuery.Type = &queryType
fullQuery.Highlighter = highlighter
}
Expand Down
5 changes: 3 additions & 2 deletions quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ type ClickhouseQueryTranslator struct {
Table *clickhouse.Table
Ctx context.Context

DateMathRenderer string // "clickhouse_interval" or "literal" if not set, we use "clickhouse_interval"
SchemaRegistry schema.Registry
DateMathRenderer string // "clickhouse_interval" or "literal" if not set, we use "clickhouse_interval"
SchemaRegistry schema.Registry
IncomingIndexName string
}

var completionStatusOK = func() *int { value := 200; return &value }()
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/query_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestMakeResponseSearchQuery(t *testing.T) {
&model.SimpleQuery{FieldName: "*"}, model.WeNeedUnlimitedCount,
)
highlighter := NewEmptyHighlighter()
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, hitQuery.SelectCommand.OrderByFieldNames(), true, false, false)
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, hitQuery.SelectCommand.OrderByFieldNames(), true, false, false, cw.Table.Name)
hitQuery.Type = &queryType
ourResponseRaw := cw.MakeSearchResponse(
[]*model.Query{hitQuery},
Expand Down
17 changes: 16 additions & 1 deletion quesma/quesma/functionality/field_capabilities/field_caps.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,16 @@ func addFieldCapabilityFromSchemaRegistry(fields map[string]map[string]model.Fie
fields[colName][fieldTypeName] = fieldCapability
}
}

func makeSourceToDestMappings(indexMappings map[string]config.IndexMappingsConfiguration) map[string]string {
sourceToDestMapping := make(map[string]string)
for _, indexMapping := range indexMappings {
for _, sourceIndex := range indexMapping.Mappings {
destIndex := indexMapping.Name
sourceToDestMapping[sourceIndex] = destIndex
}
}
return sourceToDestMapping
}
func handleFieldCapsIndex(cfg config.QuesmaConfiguration, schemaRegistry schema.Registry, indexes []string) ([]byte, error) {
fields := make(map[string]map[string]model.FieldCapability)
for _, resolvedIndex := range indexes {
Expand Down Expand Up @@ -104,6 +113,12 @@ func EmptyFieldCapsResponse() []byte {
}

func HandleFieldCaps(ctx context.Context, cfg config.QuesmaConfiguration, schemaRegistry schema.Registry, index string, lm *clickhouse.LogManager) ([]byte, error) {
sourceToDestMapping := makeSourceToDestMappings(cfg.IndexSourceToInternalMappings)

if destIndex, ok := sourceToDestMapping[index]; ok {
index = destIndex
}

indexes, err := lm.ResolveIndexes(ctx, index)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ const (
QueryLanguageEQL = "eql"
)

func NewQueryTranslator(ctx context.Context, language QueryLanguage, table *clickhouse.Table, logManager *clickhouse.LogManager, dateMathRenderer string, schemaRegistry schema.Registry) (queryTranslator IQueryTranslator) {
func NewQueryTranslator(ctx context.Context, language QueryLanguage, table *clickhouse.Table, logManager *clickhouse.LogManager, dateMathRenderer string, schemaRegistry schema.Registry, incomingIndexName string) (queryTranslator IQueryTranslator) {
switch language {
case QueryLanguageEQL:
return &eql.ClickhouseEQLQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx}
default:
return &queryparser.ClickhouseQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx, DateMathRenderer: dateMathRenderer, SchemaRegistry: schemaRegistry}
return &queryparser.ClickhouseQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx, DateMathRenderer: dateMathRenderer, SchemaRegistry: schemaRegistry, IncomingIndexName: incomingIndexName}
}
}
9 changes: 6 additions & 3 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,20 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
if err != nil {
return nil, err
}

sourceToDestMappings := makeSourceToDestMappings(q.cfg.IndexSourceToInternalMappings)
for _, resolvedTableName := range sourcesClickhouse {
var err error
doneCh := make(chan AsyncSearchWithError, 1)

incomingIndexName := resolvedTableName
if indexMapping, ok := sourceToDestMappings[resolvedTableName]; ok {
resolvedTableName = indexMapping
}
table, _ := tables.Load(resolvedTableName)
if table == nil {
return []byte{}, end_user_errors.ErrNoSuchTable.New(fmt.Errorf("can't load %s table", resolvedTableName)).Details("Table: %s", resolvedTableName)
}

queryTranslator := NewQueryTranslator(ctx, queryLanguage, table, q.logManager, q.DateMathRenderer, q.schemaRegistry)
queryTranslator := NewQueryTranslator(ctx, queryLanguage, table, q.logManager, q.DateMathRenderer, q.schemaRegistry, incomingIndexName)

queries, canParse, err := queryTranslator.ParseQuery(body)
if err != nil {
Expand Down

0 comments on commit f091838

Please sign in to comment.