Skip to content

Commit

Permalink
feat: handling synonyms and text fields more efficiently (#234)
Browse files Browse the repository at this point in the history
* Using synonyms capabilities of ES to avoid storing taxonomies fields
in the index
* Better handling of full text queries that support them within any
expression
* make boost_phrase a separate parameter
* Raise errors if the query is not well understood or do not pass some
sanity checks
* Use main translation of taxonomy for facets values (instead of a
random synonym)
* Better handling of global config to avoid treacherous patterns
* Unify parameters for Get and Post (better use of pydantic)
* Error on extraneous search parameters to avoid hard to debug issues
with typos
* Add a command to clean indexes
* Integrations tests on search and analyzers

Part of: #193

---------

Co-authored-by: Raphaël Bournhonesque <[email protected]>
  • Loading branch information
alexgarel and raphael0202 authored Oct 24, 2024
1 parent 9c79fd8 commit f427a5e
Show file tree
Hide file tree
Showing 69 changed files with 4,242 additions and 1,429 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
repos:
# Note for all linters: do not forget to update pyproject.toml when updating version.
- repo: https://github.com/python-poetry/poetry
rev: 1.8.4
hooks:
- id: poetry-lock
args: ["--check"]

- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.8.0
hooks:
Expand All @@ -15,6 +21,7 @@ repos:
rev: 5.13.2
hooks:
- id: isort

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.1
hooks:
Expand Down
14 changes: 12 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ ENV PYTHONUNBUFFERED=1 \
FROM python-base as builder-base
RUN curl -sSL https://install.python-poetry.org | python3 -
WORKDIR $PYSETUP_PATH
COPY poetry.lock pyproject.toml ./
# we need README.md for poetry check
COPY poetry.lock pyproject.toml README.md ./
RUN poetry check --lock || \
( echo "Poetry.lock is outdated, please run make update_poetry_lock" && false )
RUN poetry install --without dev

# This is our final image
Expand All @@ -40,6 +43,10 @@ COPY --from=builder-base $POETRY_HOME $POETRY_HOME
RUN poetry config virtualenvs.create false
ENV POETRY_VIRTUALENVS_IN_PROJECT=false

# create some folders, to later ensure right ownership
RUN mkdir -p /opt/search/data && \
mkdir -p /opt/search/synonyms

# create off user
ARG USER_UID
ARG USER_GID
Expand All @@ -66,8 +73,11 @@ CMD ["uvicorn", "app.api:app", "--proxy-headers", "--host", "0.0.0.0", "--port",
# ----------------------
FROM builder-base as builder-dev
WORKDIR $PYSETUP_PATH
COPY poetry.lock pyproject.toml ./
# we need README.md for poetry check
COPY poetry.lock pyproject.toml README.md ./
# full install, with dev packages
RUN poetry check --lock || \
( echo "Poetry.lock is outdated, please run make update_poetry_lock" && false )
RUN poetry install

# image with dev tooling
Expand Down
34 changes: 29 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ endif
DOCKER_COMPOSE=docker compose --env-file=${ENV_FILE}
DOCKER_COMPOSE_TEST=COMPOSE_PROJECT_NAME=search_test docker compose --env-file=${ENV_FILE}

.PHONY: build create_external_volumes livecheck up down test test_front test_front_watch test_api import-dataset import-taxonomies sync-scripts build-translations generate-openapi check check_front check_translations lint lint_back lint_front
#------------#
# Production #
#------------#
Expand Down Expand Up @@ -58,7 +59,7 @@ livecheck:

build:
@echo "🔎 building docker (for dev)"
${DOCKER_COMPOSE} build --progress=plain
${DOCKER_COMPOSE} build --progress=plain ${args}


up: _ensure_network
Expand Down Expand Up @@ -107,15 +108,34 @@ tsc_watch:
@echo "🔎 Running front-end tsc in watch mode..."
${DOCKER_COMPOSE} run --rm search_nodejs npm run build:watch

update_poetry_lock:
@echo "🔎 Updating poetry.lock"
${DOCKER_COMPOSE} run --rm api poetry lock --no-update

#-------#
# Tests #
#-------#

test: _ensure_network test_api test_front
test: _ensure_network check_poetry_lock test_api test_front

check_poetry_lock:
@echo "🔎 Checking poetry.lock"
# we have to mount whole project folder for pyproject will be checked
${DOCKER_COMPOSE} run -v $$(pwd):/project -w /project --rm api poetry check --lock

test_api: test_api_unit test_api_integration

test_api_unit:
@echo "🔎 Running API unit tests..."
${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/ --ignore=tests/int

# you can use keep_es=1 to avoid stopping elasticsearch after tests (useful during development)
test_api_integration:
@echo "🔎 Running API integration tests..."
${DOCKER_COMPOSE_TEST} up -d es01 es02 elasticvue
${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/ --ignore=tests/unit
test -z "${keep_es}" && ${DOCKER_COMPOSE_TEST} stop es01 es02 elasticvue || true

test_api:
@echo "🔎 Running API tests..."
${DOCKER_COMPOSE_TEST} run --rm api pytest ${args} tests/

test_front:
@echo "🔎 Running front-end tests..."
Expand All @@ -125,6 +145,10 @@ test_front_watch:
@echo "🔎 Running front-end tests..."
${DOCKER_COMPOSE_TEST} run --rm search_nodejs npm run test:watch

test_clean:
@echo "🔎 Cleaning tests instances..."
${DOCKER_COMPOSE_TEST} down -v

#-----------#
# Utilities #
#-----------#
Expand Down
107 changes: 74 additions & 33 deletions app/_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
from redis import Redis

from app._types import FetcherResult, FetcherStatus, JSONType
from app.config import Config, IndexConfig, TaxonomyConfig
from app.config import Config, IndexConfig, TaxonomyConfig, settings
from app.indexing import (
DocumentProcessor,
generate_index_object,
generate_taxonomy_index_object,
)
from app.taxonomy import get_taxonomy
from app.taxonomy import iter_taxonomies
from app.taxonomy_es import refresh_synonyms
from app.utils import connection, get_logger, load_class_object_from_string
from app.utils.io import jsonl_iter

Expand Down Expand Up @@ -226,17 +227,17 @@ def gen_documents(
next_index: str,
num_items: int | None,
num_processes: int,
process_id: int,
process_num: int,
):
"""Generate documents to index for process number process_id
"""Generate documents to index for process number process_num
We chunk documents based on document num % process_id
We chunk documents based on document num % process_num
"""
for i, row in enumerate(tqdm.tqdm(jsonl_iter(file_path))):
if num_items is not None and i >= num_items:
break
# Only get the relevant
if i % num_processes != process_id:
if i % num_processes != process_num:
continue

document_dict = get_document_dict(
Expand All @@ -260,26 +261,26 @@ def gen_taxonomy_documents(
:param supported_langs: a set of supported languages
:yield: a dict with the document to index, compatible with ES bulk API
"""
for taxonomy_source_config in tqdm.tqdm(taxonomy_config.sources):
taxonomy = get_taxonomy(
taxonomy_source_config.name, str(taxonomy_source_config.url)
)
for taxonomy_name, taxonomy in tqdm.tqdm(iter_taxonomies(taxonomy_config)):
for node in taxonomy.iter_nodes():
names = {}
for lang in supported_langs:
lang_names = set()
if lang in node.names:
lang_names.add(node.names[lang])
if lang in node.synonyms:
lang_names |= set(node.synonyms[lang])
names[lang] = list(lang_names)
names = {
lang: lang_names
for lang, lang_names in node.names.items()
if lang in supported_langs
}
synonyms = {
lang: lang_names
for lang, lang_names in node.synonyms.items()
if lang in supported_langs
}

yield {
"_index": next_index,
"_source": {
"id": node.id,
"taxonomy_name": taxonomy_source_config.name,
"names": names,
"taxonomy_name": taxonomy_name,
"name": names,
"synonyms": synonyms,
},
}

Expand All @@ -304,26 +305,35 @@ def update_alias(es_client: Elasticsearch, next_index: str, index_alias: str):
)


def get_alias(es_client: Elasticsearch, index_name: str):
"""Get the current index pointed by the alias."""
resp = es_client.indices.get_alias(name=index_name)
resp = list(resp.keys())
if len(resp) == 0:
return None
return resp[0]


def import_parallel(
config: IndexConfig,
file_path: Path,
next_index: str,
num_items: int | None,
num_processes: int,
process_id: int,
process_num: int,
):
"""One task of import.
:param Path file_path: the JSONL file to read
:param str next_index: the index to write to
:param int num_items: max number of items to import, default to no limit
:param int num_processes: total number of processes
:param int process_id: the index of the process
:param int process_num: the index of the process
(from 0 to num_processes - 1)
"""
processor = DocumentProcessor(config)
# open a connection for this process
es = connection.get_es_client(timeout=120, retry_on_timeout=True)
es = connection.get_es_client(request_timeout=120, retry_on_timeout=True)
# Note that bulk works better than parallel bulk for our usecase.
# The preprocessing in this file is non-trivial, so it's better to
# parallelize that. If we then do parallel_bulk here, this causes queueing
Expand All @@ -336,13 +346,11 @@ def import_parallel(
next_index,
num_items,
num_processes,
process_id,
process_num,
),
raise_on_error=False,
)
if not success:
logger.error("Encountered errors: %s", errors)
return success, errors
return process_num, success, errors


def import_taxonomies(config: IndexConfig, next_index: str):
Expand All @@ -353,8 +361,7 @@ def import_taxonomies(config: IndexConfig, next_index: str):
:param config: the index configuration to use
:param next_index: the index to write to
"""
# open a connection for this process
es = connection.get_es_client(timeout=120, retry_on_timeout=True)
es = connection.current_es_client()
# Note that bulk works better than parallel bulk for our usecase.
# The preprocessing in this file is non-trivial, so it's better to
# parallelize that. If we then do parallel_bulk
Expand Down Expand Up @@ -480,15 +487,16 @@ def run_items_import(
if True consider we don't have a full import,
and directly updates items in current index.
"""
es_client = connection.get_es_client()
# we need a large timeout as index creation can take a while because of synonyms
es_client = connection.get_es_client(request_timeout=600)
if not partial:
# we create a temporary index to import to
# at the end we will change alias to point to it
index_date = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")
next_index = f"{config.index.name}-{index_date}"
index = generate_index_object(next_index, config)
# create the index
index.save()
index.save(using=es_client)
else:
# use current index
next_index = config.index.name
Expand All @@ -509,12 +517,18 @@ def run_items_import(
# run in parallel
num_errors = 0
with Pool(num_processes) as pool:
for success, errors in pool.starmap(import_parallel, args):
if not success:
for i, success, errors in pool.starmap(import_parallel, args):
# Note: we log here instead of in sub-process because
# it's easier to avoid mixing logs, and it works better for pytest
logger.info("[%d] Indexed %d documents", i, success)
if errors:
logger.error("[%d] Encountered %d errors: %s", i, len(errors), errors)
num_errors += len(errors)
# update with last index updates (hopefully since the jsonl)
if not skip_updates:
num_errors += get_redis_updates(es_client, next_index, config)
# wait for index refresh
es_client.indices.refresh(index=next_index)
if not partial:
# make alias point to new index
update_alias(es_client, next_index, config.index.name)
Expand All @@ -537,11 +551,38 @@ def perform_taxonomy_import(config: IndexConfig) -> None:
index.save()

import_taxonomies(config, next_index)
# wait for index refresh
es_client.indices.refresh(index=next_index)

# make alias point to new index
update_alias(es_client, next_index, config.taxonomy.index.name)


def perform_cleanup_indexes(config: IndexConfig) -> int:
"""Delete old indexes (that have no active alias on them)."""
removed = 0
# some timeout for it can be long
es_client = connection.get_es_client(request_timeout=600)
prefixes = [config.index.name, config.taxonomy.index.name]
for prefix in prefixes:
# get all indexes
indexes = es_client.indices.get_alias(index=f"{prefix}-*")
# remove all index without alias
to_remove = [
index for index, data in indexes.items() if not data.get("aliases")
]
for index in to_remove:
logger.info("Deleting index %s", index)
es_client.indices.delete(index=index)
removed += 1
return removed


def perform_refresh_synonyms(index_id: str, config: IndexConfig) -> None:
"""Refresh synonyms files generated by taxonomies."""
refresh_synonyms(index_id, config, settings.synonyms_path)


def run_update_daemon(config: Config) -> None:
"""Run the update import daemon.
Expand Down
Loading

0 comments on commit f427a5e

Please sign in to comment.