Skip to content

Commit

Permalink
Refactor to have changes in only es8
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreasR90 committed Sep 1, 2023
1 parent 5344151 commit 6c052a0
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 141 deletions.
161 changes: 29 additions & 132 deletions haystack/document_stores/elasticsearch/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Any, Dict, List, Optional, Union
from typing import Dict, List, Optional

import numpy as np

Expand All @@ -12,60 +12,6 @@


class _ElasticsearchDocumentStore(SearchEngineDocumentStore):
def __init__(
self,
client: Any,
index: str = "document",
label_index: str = "label",
search_fields: Union[str, list] = "content",
content_field: str = "content",
name_field: str = "name",
embedding_field: str = "embedding",
embedding_dim: int = 768,
custom_mapping: Optional[dict] = None,
excluded_meta_data: Optional[list] = None,
analyzer: str = "standard",
recreate_index: bool = False,
create_index: bool = True,
refresh_type: str = "wait_for",
similarity: str = "dot_product",
return_embedding: bool = False,
duplicate_documents: str = "overwrite",
scroll: str = "1d",
skip_missing_embeddings: bool = True,
synonyms: Optional[List] = None,
synonym_type: str = "synonym",
batch_size: int = 10_000,
index_type: str = "exact",
hnsw_num_candidates: Optional[int] = None,
):
self.index_type = index_type
self.hnsw_num_candidates = hnsw_num_candidates
super().__init__(
client=client,
index=index,
label_index=label_index,
search_fields=search_fields,
content_field=content_field,
name_field=name_field,
embedding_field=embedding_field,
embedding_dim=embedding_dim,
custom_mapping=custom_mapping,
excluded_meta_data=excluded_meta_data,
analyzer=analyzer,
recreate_index=recreate_index,
create_index=create_index,
refresh_type=refresh_type,
similarity=similarity,
return_embedding=return_embedding,
duplicate_documents=duplicate_documents,
scroll=scroll,
skip_missing_embeddings=skip_missing_embeddings,
synonyms=synonyms,
synonym_type=synonym_type,
batch_size=batch_size,
)

def query_by_embedding(
self,
query_emb: np.ndarray,
Expand Down Expand Up @@ -177,8 +123,6 @@ def query_by_embedding(
count_embeddings = self.get_embedding_count(index=index, headers=headers)
if count_embeddings == 0:
logger.warning("No documents with embeddings. Run the document store's update_embeddings() method.")
# TODO: hnsw does not throw an error since it directly ignores all documents without embedding
# What should we do? Artifically throw an error? -> Affects test_query_with_filters_and_missing_embeddings
except self._RequestError as e:
if e.error == "search_phase_execution_exception":
error_message: str = (
Expand All @@ -197,53 +141,20 @@ def query_by_embedding(
def _construct_dense_query_body(
self, query_emb: np.ndarray, return_embedding: bool, filters: Optional[FilterType] = None, top_k: int = 10
):
if self.index_type == "hnsw":
body = self._construct_dense_query_body_ann(query_emb=query_emb, top_k=top_k, filters=filters)
elif self.index_type == "exact":
body = self._construct_dense_query_body_knn(query_emb=query_emb, top_k=top_k, filters=filters)
else:
raise DocumentStoreError(f"index_type {self.index_type} not supported")
body = {"size": top_k, "query": self._get_vector_similarity_query(query_emb, top_k)}
if filters:
filter_ = {"bool": {"filter": LogicalFilterClause.parse(filters).convert_to_elasticsearch()}}
if body["query"]["script_score"]["query"] == {"match_all": {}}:
body["query"]["script_score"]["query"] = filter_
else:
body["query"]["script_score"]["query"]["bool"]["filter"]["bool"]["must"].append(filter_)

excluded_fields = self._get_excluded_fields(return_embedding=return_embedding)
if excluded_fields:
body["_source"] = {"excludes": excluded_fields}

return body

def _construct_dense_query_body_knn(self, query_emb: np.ndarray, top_k: int, filters: Optional[FilterType] = None):
filter_ = self._construct_filter(filters)
body = {"size": top_k, "query": {"script_score": self._get_vector_similarity_query(query_emb, top_k=top_k)}}
body["query"]["script_score"]["query"] = {"bool": {"filter": filter_}} # type: ignore
return body

def _construct_dense_query_body_ann(self, query_emb: np.ndarray, top_k: int, filters: Optional[FilterType] = None):
filter_ = self._construct_filter(filters)
body = {
"knn": {
"field": self.embedding_field,
"query_vector": query_emb,
"num_candidates": self._get_ann_num_candidates(top_k),
"k": top_k,
"filter": filter_,
}
}
return body

def _get_ann_num_candidates(self, top_k: int) -> int:
if self.hnsw_num_candidates is None:
return 10 * top_k
return self.hnsw_num_candidates

def _construct_filter(self, filters: Optional[FilterType] = None) -> Dict:
filter_ = []
if filters:
filter_.append(LogicalFilterClause.parse(filters).convert_to_elasticsearch())
if self.skip_missing_embeddings:
skip_missing_embedding_filter = {"exists": {"field": self.embedding_field}}
filter_.append(skip_missing_embedding_filter)
if len(filter_) == 0:
return {"match_all": {}}
return {"bool": {"must": filter_}}

def _create_document_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
"""
Create a new index for storing documents.
Expand Down Expand Up @@ -279,7 +190,7 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
mapping["mappings"]["properties"].update({field: {"type": "text"}})

if self.embedding_field:
mapping["mappings"]["properties"][self.embedding_field] = self._create_embedding_field_mapping()
mapping["mappings"]["properties"][self.embedding_field] = self._create_embedding_index_field()

try:
self._index_create(index=index_name, **mapping, headers=headers)
Expand All @@ -291,23 +202,8 @@ def _create_document_index(self, index_name: str, headers: Optional[Dict[str, st
if not self._index_exists(index_name, headers=headers):
raise e

def _create_embedding_field_mapping(self):
mapping = {"type": "dense_vector", "dims": self.embedding_dim}
if self.index_type == "exact":
return mapping
mapping["index"] = True
mapping["similarity"] = self._get_similarity_string()
return mapping

def _get_similarity_string(self):
if self.similarity == "dot_product":
return "dot_product"
elif self.similarity == "cosine":
return "cosine"
elif self.similarity == "l2":
return "l2_norm"
else:
raise ValueError(f"Unknown similarity metric: {self.similarity}")
def _create_embedding_index_field(self) -> Dict:
return {"type": "dense_vector", "dims": self.embedding_dim}

def _create_label_index(self, index_name: str, headers: Optional[Dict[str, str]] = None):
mapping = {
Expand Down Expand Up @@ -370,6 +266,7 @@ def _validate_and_adjust_document_index(self, index_name: str, headers: Optional
mapping["properties"][search_field] = (
{"type": "text", "analyzer": "synonym"} if self.synonyms else {"type": "text"}
)
self._index_put_mapping(index=index_id, body=mapping, headers=headers)

if self.embedding_field:
if (
Expand All @@ -381,15 +278,7 @@ def _validate_and_adjust_document_index(self, index_name: str, headers: Optional
f"The index '{index_id}' in Elasticsearch already has a field called '{self.embedding_field}' "
f"of type '{mapping['properties'][self.embedding_field]['type']}'."
)
request_mapping = self._create_embedding_field_mapping()
embedding_mapping_exists = self.embedding_field in mapping["properties"]
if embedding_mapping_exists and (
mapping["properties"][self.embedding_field].keys() != request_mapping.keys()
):
raise DocumentStoreError(
"The mapping of the existing embedding field is not compatible with the requested mapping."
)
mapping["properties"][self.embedding_field] = request_mapping
mapping["properties"][self.embedding_field] = self._create_embedding_index_field()
self._index_put_mapping(index=index_id, body=mapping, headers=headers)

def _validate_server_version(self, expected_version: int):
Expand All @@ -404,7 +293,7 @@ def _validate_server_version(self, expected_version: int):
".".join(map(str, self.server_version)),
)

def _get_vector_similarity_query(self, query_emb: np.ndarray, top_k: int) -> Dict[str, Any]:
def _get_vector_similarity_query(self, query_emb: np.ndarray, top_k: int):
"""
Generate Elasticsearch query for vector similarity.
"""
Expand All @@ -419,18 +308,26 @@ def _get_vector_similarity_query(self, query_emb: np.ndarray, top_k: int) -> Dic
"Invalid value for similarity in ElasticSearchDocumentStore constructor. Choose between 'cosine', 'dot_product' and 'l2'"
)

# To handle scenarios where embeddings may be missing
script_score_query: dict = {"match_all": {}}
if self.skip_missing_embeddings:
script_score_query = {"bool": {"filter": {"bool": {"must": [{"exists": {"field": self.embedding_field}}]}}}}

# Elasticsearch 7.6 introduced a breaking change regarding the vector function signatures:
# https://www.elastic.co/guide/en/elasticsearch/reference/7.6/breaking-changes-7.6.html#_update_to_vector_function_signatures
if self.server_version[0] == 7 and self.server_version[1] < 6:
similarity_script_score = f"{similarity_fn_name}(params.query_vector,doc['{self.embedding_field}']) + 1000"
similarity_script_source = f"{similarity_fn_name}(params.query_vector,doc['{self.embedding_field}']) + 1000"
else:
similarity_script_score = f"{similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000"
similarity_script_source = f"{similarity_fn_name}(params.query_vector,'{self.embedding_field}') + 1000"

query = {
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": similarity_script_score,
"params": {"query_vector": query_emb.tolist()},
"script_score": {
"query": script_score_query,
"script": {
# offset score to ensure a positive range as required by Elasticsearch
"source": similarity_script_source,
"params": {"query_vector": query_emb.tolist()},
},
}
}
return query
Expand Down
101 changes: 96 additions & 5 deletions haystack/document_stores/elasticsearch/es8.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import numpy as np

from haystack.document_stores.filter_utils import LogicalFilterClause
from haystack.errors import DocumentStoreError
from haystack.lazy_imports import LazyImport
from haystack.schema import Document, FilterType

Expand Down Expand Up @@ -70,7 +72,7 @@ def __init__(
use_system_proxy: bool = False,
batch_size: int = 10_000,
index_type: str = "exact",
hnsw_num_candidates: Optional[int] = None,
knn_parameters: Optional[Dict] = None,
):
"""
A DocumentStore using Elasticsearch to store and query the documents for our search.
Expand Down Expand Up @@ -149,8 +151,8 @@ def __init__(
'hnsw' uses the HNSW Algorithm to determine nearest neighbors. This is more performant than 'exact'.
'hnsw' is only supported for elasticsearch >=8.
Defaults to 'exact'.
:param hnsw_num_candidates: Specifies the number of candidates which are used to compute the approximate nearest neighbors. Only supported for elasticsearch > 8.0.
Defaults to None. If None, the number of candidates is set to 10 times the requested top_k hits.
:param knn_parameters: Custom parameters for the KNN engine. Parameter names depend on the index type you use.
`"ef_construction"`, `"ef_search"`, `"m"`
"""
# Ensure all the required inputs were successful
Expand All @@ -172,6 +174,10 @@ def __init__(
use_system_proxy=use_system_proxy,
)

self.index_type = index_type
self.knn_parameters = knn_parameters or {}
self._check_hnsw_parameters()

super().__init__(
client=client,
index=index,
Expand All @@ -195,8 +201,6 @@ def __init__(
synonyms=synonyms,
synonym_type=synonym_type,
batch_size=batch_size,
index_type=index_type,
hnsw_num_candidates=hnsw_num_candidates,
)

self._validate_server_version(expected_version=8)
Expand Down Expand Up @@ -422,3 +426,90 @@ def _execute_msearch(self, index: str, body: List[Dict[str, Any]], scale_score:
documents.append(cur_documents)

return documents

def _construct_dense_query_body(
self, query_emb: np.ndarray, return_embedding: bool, filters: Optional[FilterType] = None, top_k: int = 10
):
if self.index_type == "hnsw":
body = self._construct_dense_query_body_ann(query_emb=query_emb, top_k=top_k, filters=filters)
elif self.index_type == "exact":
body = self._construct_dense_query_body_knn(query_emb=query_emb, top_k=top_k, filters=filters)
else:
raise DocumentStoreError(f"index_type {self.index_type} not supported")
excluded_fields = self._get_excluded_fields(return_embedding=return_embedding)
if excluded_fields:
body["_source"] = {"excludes": excluded_fields}

return body

def _construct_dense_query_body_knn(self, query_emb: np.ndarray, top_k: int, filters: Optional[FilterType] = None):
filter_ = self._construct_filter(filters)
body = {"size": top_k, "query": self._get_vector_similarity_query(query_emb, top_k=top_k)}
body["query"]["script_score"]["query"] = {"bool": {"filter": filter_}} # type: ignore
return body

def _construct_dense_query_body_ann(self, query_emb: np.ndarray, top_k: int, filters: Optional[FilterType] = None):
filter_ = self._construct_filter(filters)
body = {
"knn": {
"field": self.embedding_field,
"query_vector": query_emb,
"k": top_k,
"filter": filter_,
"num_candidates": self.knn_parameters.get("num_candidates", 10 * top_k),
}
}
return body

def _construct_filter(self, filters: Optional[FilterType] = None) -> Dict:
filter_ = []
if filters:
filter_.append(LogicalFilterClause.parse(filters).convert_to_elasticsearch())
if self.skip_missing_embeddings:
skip_missing_embedding_filter = {"exists": {"field": self.embedding_field}}
filter_.append(skip_missing_embedding_filter)
if len(filter_) == 0:
return {"match_all": {}}
return {"bool": {"must": filter_}}

def _create_embedding_field_mapping(self):
mapping = {"type": "dense_vector", "dims": self.embedding_dim}
if self.index_type == "exact":
return mapping
mapping["index"] = True
mapping["similarity"] = self._get_similarity_string()
if self._hnsw_parameters_index_specified():
mapping["index_options"] = {
"index_type": "hnsw",
"ef_construction": self.knn_parameters["ef_construction"],
"m": self.knn_parameters["m"],
}
return mapping

def _hnsw_parameters_index_specified(self):
return "ef_construction" in self.knn_parameters and "m" in self.knn_parameters

def _check_hnsw_parameters(self):
parameters = set(self.knn_parameters.keys())
if len(parameters.intersection({"ef_construction", "m"})) == 1:
raise ValueError("Both ef_construction and m or none of these must be specified for HNSW index")

def _create_embedding_index_field(self) -> Dict:
if self.index_type == "exact":
return super()._create_embedding_index_field()
return {
"type": "dense_vector",
"dims": self.embedding_dim,
"index": True,
"similarity": self._get_similarity_string(),
}

def _get_similarity_string(self):
if self.similarity == "dot_product":
return "dot_product"
elif self.similarity == "cosine":
return "cosine"
elif self.similarity == "l2":
return "l2_norm"
else:
raise ValueError(f"Unknown similarity metric: {self.similarity}")
Loading

0 comments on commit 6c052a0

Please sign in to comment.