Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Search facets #103

Merged
merged 12 commits into from
Jun 3, 2024
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ test: _ensure_network test_api test_front

test_api:
@echo "🔎 Running API tests..."
${DOCKER_COMPOSE_TEST} run --rm api pytest tests/
${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/

test_front:
@echo "🔎 Running front-end tests..."
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ uvicorn app.api:app --reload --port=8001 --workers=4
```
Note that it's important to use port 8001, as port 8000 will be used by the docker version of the search service.



To debug the backend app:
* stop API instance: `docker compose stop api`
* add a pdb.set_trace() at the point you want,
* then launch `docker compose run --rm --use-aliases api uvicorn app.api:app --proxy-headers --host 0.0.0.0 --port 8000 --reload`[^use_aliases]

[^use_aliases]: the `--use-aliases` make it so that this container is reachable as "api" for the other containers in the compose


### Pre-Commit

This repo uses [pre-commit](https://pre-commit.com/) to enforce code styling, etc. To use it:
Expand Down
26 changes: 25 additions & 1 deletion app/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,29 @@
JSONType = dict[str, Any]


class FacetItem(BaseModel):
"""Describes an entry of a facet"""

key: str
name: str
count: int
"""The number of elements for this value"""


class FacetInfo(BaseModel):
"""Search result for a facet"""

name: str
"""The display name of the facet"""
items: list[FacetItem]
"""Items in the facets"""
count_error_margin: int | None = None


FacetsInfos = dict[str, FacetInfo]
"""Information about facets for a search result"""


class SearchResponseDebug(BaseModel):
query: JSONType

Expand All @@ -26,7 +49,8 @@ def is_success(self):

class SuccessSearchResponse(BaseModel):
hits: list[JSONType]
aggregations: JSONType
aggregations: JSONType | None = None
facets: FacetsInfos | None = None
page: int
page_size: int
page_count: int
Expand Down
94 changes: 30 additions & 64 deletions app/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,18 @@
from fastapi.responses import HTMLResponse, PlainTextResponse
from fastapi.templating import Jinja2Templates

import app.search as app_search
from app import config
from app._types import SearchResponse, SuccessSearchResponse
from app.config import check_config_is_defined, settings
from app.postprocessing import (
BaseResultProcessor,
load_result_processor,
process_taxonomy_completion_response,
)
from app.query import (
build_completion_query,
build_elasticsearch_query_builder,
build_search_query,
execute_query,
)
from app.facets import check_all_facets_fields_are_agg
from app.postprocessing import process_taxonomy_completion_response
from app.query import build_completion_query
from app.utils import connection, get_logger, init_sentry

logger = get_logger()


if config.CONFIG is None:
# We want to be able to import api.py (for tests for example) without
# failure, but we add a warning message as it's not expected in a
# production settings
logger.warning("Main configuration is not set, use CONFIG_PATH envvar")
FILTER_QUERY_BUILDERS = {}
RESULT_PROCESSORS = {}
else:
# we cache query builder and result processor here for faster processing
FILTER_QUERY_BUILDERS = {
index_id: build_elasticsearch_query_builder(index_config)
for index_id, index_config in config.CONFIG.indices.items()
}
RESULT_PROCESSORS = {
index_id: load_result_processor(index_config)
for index_id, index_config in config.CONFIG.indices.items()
}


app = FastAPI(
title="search-a-licious API",
contact={
Expand Down Expand Up @@ -116,6 +90,13 @@ def get_document(
return product


def check_facets_are_valid(index_id: str | None, facets: list[str] | None) -> None:
"""Check that the facets are valid."""
errors = check_all_facets_fields_are_agg(index_id, facets)
if errors:
raise HTTPException(status_code=400, detail=json.dumps(errors))


@app.get("/search")
def search(
q: Annotated[
Expand All @@ -130,7 +111,7 @@ def search(
filter clause for categories and brands and look for "strawberry" in multiple
fields.

The query is optional, but `sort_by` value must then be provided."""
The query is optional, but `sort_by` value must then be provided."""
),
] = None,
langs: Annotated[
Expand Down Expand Up @@ -163,59 +144,44 @@ def search(
and be sortable. If it is not provided, results are sorted by descending relevance score."""
),
] = None,
facets: Annotated[
str | None,
Query(
description="""Name of facets to return in the response as a comma-separated value.
If None (default) no facets are returned."""
),
] = None,
index_id: Annotated[
str | None,
INDEX_ID_QUERY_PARAM,
] = None,
) -> SearchResponse:
# check and preprocess parameters
check_config_is_defined()
global_config = cast(config.Config, config.CONFIG)
check_index_id_is_defined(index_id, global_config)
index_id, index_config = global_config.get_index_config(index_id)
result_processor = cast(BaseResultProcessor, RESULT_PROCESSORS[index_id])
facets_list = facets.split(",") if facets else None
check_facets_are_valid(index_id, facets_list)
if q is None and sort_by is None:
raise HTTPException(
status_code=400, detail="`sort_by` must be provided when `q` is missing"
)

langs_set = set(langs.split(",") if langs else ["en"])
logger.debug(
"Received search query: q='%s', langs='%s', page=%d, "
"page_size=%d, fields='%s', sort_by='%s'",
q,
langs_set,
page,
page_size,
fields,
sort_by,
)

if page * page_size > 10_000:
raise HTTPException(
status_code=400,
detail=f"Maximum number of returned results is 10 000 (here: page * page_size = {page * page_size})",
)

query = build_search_query(
langs_list = langs.split(",") if langs else ["en"]
# search
return app_search.search(
q=q,
langs=langs_set,
size=page_size,
langs=langs_list,
page_size=page_size,
page=page,
config=index_config,
fields=fields.split(",") if fields else None,
sort_by=sort_by,
# filter query builder is generated from elasticsearch mapping and
# takes ~40ms to generate, build-it before hand to avoid this delay
filter_query_builder=FILTER_QUERY_BUILDERS[index_id],
)
logger.debug("Elasticsearch query: %s", query.to_dict())

projection = set(fields.split(",")) if fields else None
return execute_query(
query,
result_processor,
page=page,
page_size=page_size,
projection=projection,
facets=facets_list,
index_id=index_id,
)


Expand Down
33 changes: 32 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from enum import StrEnum, auto
from pathlib import Path
from typing import Annotated
Expand All @@ -8,6 +9,8 @@
from pydantic.json_schema import GenerateJsonSchema
from pydantic_settings import BaseSettings

log = logging.getLogger(__name__)


class LoggingLevel(StrEnum):
NOTSET = "NOTSET"
Expand Down Expand Up @@ -232,6 +235,12 @@ class ESIndexConfig(BaseModel):


class TaxonomyIndexConfig(BaseModel):
"""We have an index storing multiple taxonomies

It enables functions like auto-completion, or field suggestions
as well as enrichment of requests with synonyms
"""

name: Annotated[
str,
Field(description="name of the taxonomy index alias to use"),
Expand All @@ -245,6 +254,12 @@ class TaxonomyIndexConfig(BaseModel):


class TaxonomyConfig(BaseModel):
"""Configuration of taxonomies,
that is collections of entries with synonyms in multiple languages

Field may be linked to taxonomies.
"""

sources: Annotated[
list[TaxonomySourceConfig],
Field(description="configurations of used taxonomies"),
Expand All @@ -256,7 +271,7 @@ class TaxonomyConfig(BaseModel):
"to be always exported during indexing. During indexing, we use the taxonomy "
"to translate every taxonomized field in a language-specific subfield. The list "
"of language depends on the value defined here and on the optional "
"`taxonomy_langs` field that can be defined in each document."
"`taxonomy_langs` field that can be defined in each document.",
),
]
index: Annotated[
Expand All @@ -268,6 +283,11 @@ class TaxonomyConfig(BaseModel):


class IndexConfig(BaseModel):
"""Inside the config file we can have several indexes defined.

This object gives configuration for one index.
"""

index: Annotated[
ESIndexConfig, Field(description="configuration of the Elasticsearch index")
]
Expand Down Expand Up @@ -413,8 +433,19 @@ def get_taxonomy_langs(self) -> set[str]:
# langs will be stored in a unique `other` subfield
return (set(self.taxonomy.exported_langs)) & set(ANALYZER_LANG_MAPPING)

def get_fields_with_bucket_agg(self):
return [
field_name for field_name, field in self.fields.items() if field.bucket_agg
]


class Config(BaseModel):
"""This is the global config object that reflects
the yaml configuration file.

Validations will be performed while we load it.
"""

indices: dict[str, IndexConfig] = Field(
description="configuration of indices. "
"The key is the ID of the index that can be referenced at query time. "
Expand Down
99 changes: 99 additions & 0 deletions app/facets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
A module to help building facets from aggregations
"""

from . import config
from ._types import FacetInfo, FacetItem, FacetsInfos, SearchResponse


def safe_get_index_config(
index: str | None = None, configuration: config.Config | None = None
) -> config.IndexConfig | None:
"""Safely get config"""
if configuration is None:
configuration = config.CONFIG
if configuration is None:
return None
_, index_config = configuration.get_index_config(index)
return index_config


def check_all_facets_fields_are_agg(
index_id: str | None, facets: list[str] | None
) -> list[str]:
"""Check all facets are valid,
that is, correspond to a field with aggregation"""
errors: list[str] = []
if facets is None:
return errors
config = safe_get_index_config(index_id)
if config is None:
raise ValueError(f"Cannot get index config for index_id {index_id}")
for field_name in facets:
if field_name not in config.fields:
errors.append(f"Unknown field name in facets: {field_name}")
elif not config.fields[field_name].bucket_agg:
errors.append(f"Non aggregation field name in facets: {field_name}")
return errors


def build_facets(
search_result: SearchResponse,
index_config: config.IndexConfig,
facets_names: list[str] | None,
) -> FacetsInfos:
"""Given a search result with aggregations,
build a list of facets for API response
"""
aggregations = search_result.aggregations
if facets_names is None or aggregations is None:
return {}
facets: FacetsInfos = {}
for field_name in facets_names:
if field_name not in aggregations:
pass
agg_data = aggregations[field_name]
agg_buckets = agg_data.get("buckets", [])
# best values
facet_items = [
FacetItem(
key=bucket["key"],
# TODO: compute name in target language if there is a taxonomy
name=bucket["key"],
count=bucket["doc_count"],
# TODO: add if selected
)
for bucket in agg_buckets
]
# add other values
if agg_data["sum_other_doc_count"]:
facet_items.append(
FacetItem(
key="--other--",
# TODO: translate in target language ?
name="Other",
count=agg_data["sum_other_doc_count"],
)
)
items_count = sum(item.count for item in facet_items)
# and no values
if (not search_result.is_count_exact) and search_result.count > items_count:
facet_items.append(
FacetItem(
key="--none--",
# TODO: translate in target language ?
name="None",
# Note: this depends on search_result.is_count_exact,
# but we leave it to user to verify
count=search_result.count - items_count,
)
)
# append our facet information
facets[field_name] = FacetInfo(
# FIXME translate
name=field_name,
items=facet_items,
count_error_margin=agg_data["doc_count_error_upper_bound"],
)

return facets
Loading