Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extend pipeline.add_component to support stores #5261

Merged
merged 48 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
d967605
add protocol and adapt pipeline
ZanSara Jul 3, 2023
e061d3d
change API in pipeline.add_component
ZanSara Jul 3, 2023
9eb2734
adapt pipeline tests
ZanSara Jul 3, 2023
b8af512
adapt memoryretriever
ZanSara Jul 3, 2023
b0cec16
additional checks
ZanSara Jul 4, 2023
1703ef3
separate protocol and mixin
ZanSara Jul 4, 2023
7494028
review feedback & update tests
ZanSara Jul 5, 2023
85ca595
pylint
ZanSara Jul 5, 2023
c644675
Update haystack/preview/document_stores/protocols.py
ZanSara Jul 5, 2023
8b9d8ee
Update haystack/preview/document_stores/memory/document_store.py
ZanSara Jul 5, 2023
8cba631
docstring of Store
ZanSara Jul 5, 2023
68bbf1e
adapt memorydocumentstore
ZanSara Jul 5, 2023
1583d6f
fix tests
ZanSara Jul 5, 2023
0edfbf0
Merge branch 'v2-docstore-protocol' into v2-docstores-connections
ZanSara Jul 5, 2023
c518252
remove direct inheritance
ZanSara Jul 6, 2023
d54a953
pylint
ZanSara Jul 6, 2023
29cd817
Update haystack/preview/document_stores/mixins.py
ZanSara Jul 6, 2023
db0a792
Update test/preview/components/retrievers/test_memory_retriever.py
ZanSara Jul 6, 2023
a60a8b5
Update test/preview/components/retrievers/test_memory_retriever.py
ZanSara Jul 6, 2023
84b7b86
Update test/preview/components/retrievers/test_memory_retriever.py
ZanSara Jul 6, 2023
78992d3
Update test/preview/components/retrievers/test_memory_retriever.py
ZanSara Jul 6, 2023
53ee0c5
Update test/preview/components/retrievers/test_memory_retriever.py
ZanSara Jul 6, 2023
2b11bf5
test names
ZanSara Jul 6, 2023
254b120
revert suggestion
ZanSara Jul 6, 2023
2619d9c
private self._stores
ZanSara Jul 6, 2023
3e5d2f7
move asserts out
ZanSara Jul 6, 2023
7e0cc7b
remove protocols
ZanSara Jul 6, 2023
fa50bf4
review feedback
ZanSara Jul 7, 2023
34600bc
review feedback
ZanSara Jul 7, 2023
e2e807a
fix tests
ZanSara Jul 7, 2023
aeb96c0
Merge branch 'main' into v2-docstores-connections
ZanSara Jul 7, 2023
9a249b7
mypy
ZanSara Jul 7, 2023
357390f
review feedback
ZanSara Jul 7, 2023
1813804
fix tests & other details
ZanSara Jul 10, 2023
f315019
naming
ZanSara Jul 10, 2023
4cc2506
mypy
ZanSara Jul 10, 2023
84252d0
fix tests
ZanSara Jul 10, 2023
5d220af
typing
ZanSara Jul 10, 2023
9214b79
Merge branch 'main' into v2-docstores-connections
ZanSara Jul 11, 2023
6055455
partial review feedback
ZanSara Jul 12, 2023
53f624b
move .store to input dataclass
ZanSara Jul 13, 2023
3f585a6
Revert "move .store to input dataclass"
ZanSara Jul 13, 2023
a6c1b24
disable reusing components with stores
ZanSara Jul 13, 2023
0c466c6
disable sharing components with docstores
ZanSara Jul 13, 2023
e832084
Merge branch 'main' into v2-docstores-connections
ZanSara Jul 13, 2023
cf1d6dc
Update mixins.py
ZanSara Jul 13, 2023
e6f894f
black
ZanSara Jul 13, 2023
e5ccad9
upgrade canals & fix tests
ZanSara Jul 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions haystack/preview/components/retrievers/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
class MemoryRetriever:
"""
A component for retrieving documents from a MemoryDocumentStore using the BM25 algorithm.

Needs to be connected to a MemoryDocumentStore to run.
"""

@dataclass
Expand All @@ -27,7 +29,6 @@ class Input(ComponentInput):
filters: Dict[str, Any]
top_k: int
scale_score: bool
stores: Dict[str, Any]

@dataclass
class Output(ComponentOutput):
Expand All @@ -39,28 +40,54 @@ class Output(ComponentOutput):

documents: List[Document]

def __init__(
self,
document_store_name: str,
filters: Optional[Dict[str, Any]] = None,
top_k: int = 10,
scale_score: bool = True,
):
def __init__(self, filters: Optional[Dict[str, Any]] = None, top_k: int = 10, scale_score: bool = True):
"""
Create a MemoryRetriever component.

:param document_store_name: The name of the MemoryDocumentStore to retrieve documents from.
:param filters: A dictionary with filters to narrow down the search space (default is None).
:param top_k: The maximum number of documents to retrieve (default is 10).
:param scale_score: Whether to scale the BM25 score or not (default is True).

:raises ValueError: If the specified top_k is not > 0.
"""
self.document_store_name = document_store_name
if top_k <= 0:
raise ValueError(f"top_k must be > 0, but got {top_k}")
self.defaults = {"top_k": top_k, "scale_score": scale_score, "filters": filters or {}}

@property
def store(self) -> Optional[MemoryDocumentStore]:
"""
This property allows Pipelines to connect the component with the stores it requires.

Stores have to be instances of MemoryDocumentStore, or the assignment will fail.
"""
return getattr(self, "_store", None)

@store.setter
def store(self, store: MemoryDocumentStore):
"""
This property allows Pipelines to connect the component with the stores it requires.

Stores have to be instances of MemoryDocumentStore, or the assignment will fail.

:param store: the MemoryDocumentStore instance to retrieve from.
:raises ValueError if the store is not an instance of MemoryDocumentStore.
"""
if not store:
raise ValueError("Can't set the value of the store to None.")
if not isinstance(store, MemoryDocumentStore):
raise ValueError("MemoryRetriever can only be used with a MemoryDocumentStore instance.")
self._store = store
silvanocerza marked this conversation as resolved.
Show resolved Hide resolved

def warmup(self):
"""
Double-checks that a store is set before running this component in a pipeline.
"""
if not self.store:
raise ValueError(
"MemoryRetriever needs a store to run: use the 'store' parameter of 'add_component' to connect them."
)
ZanSara marked this conversation as resolved.
Show resolved Hide resolved

def run(self, data: Input) -> Output:
"""
Run the MemoryRetriever on the given input data.
Expand All @@ -70,15 +97,9 @@ def run(self, data: Input) -> Output:

:raises ValueError: If the specified document store is not found or is not a MemoryDocumentStore instance.
"""
if self.document_store_name not in data.stores:
raise ValueError(
f"MemoryRetriever's document store '{self.document_store_name}' not found "
f"in input stores {list(data.stores.keys())}"
)
document_store = data.stores[self.document_store_name]
if not isinstance(document_store, MemoryDocumentStore):
raise ValueError("MemoryRetriever can only be used with a MemoryDocumentStore instance.")
docs = document_store.bm25_retrieval(
if not self.store:
raise ValueError("MemoryRetriever needs a store to run: set the store instance to the self.store attribute")
docs = self.store.bm25_retrieval(
query=data.query, filters=data.filters, top_k=data.top_k, scale_score=data.scale_score
)
return MemoryRetriever.Output(documents=docs)
2 changes: 2 additions & 0 deletions haystack/preview/document_stores/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
from haystack.preview.document_stores.protocols import Store, DuplicatePolicy
from haystack.preview.document_stores.mixins import StoreMixin, MultiStoreMixin, StoreComponent, MultiStoreComponent
from haystack.preview.document_stores.memory.document_store import MemoryDocumentStore
from haystack.preview.document_stores.errors import StoreError, DuplicateDocumentError, MissingDocumentError
14 changes: 7 additions & 7 deletions haystack/preview/document_stores/memory/document_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from tqdm.auto import tqdm

from haystack.preview.dataclasses import Document
from haystack.preview.document_stores.protocols import DuplicatePolicy
from haystack.preview.document_stores.memory._filters import match
from haystack.preview.document_stores.errors import DuplicateDocumentError, MissingDocumentError
from haystack.utils.scipy_utils import expit

logger = logging.getLogger(__name__)
DuplicatePolicy = Literal["skip", "overwrite", "fail"]

# document scores are essentially unbounded and will be scaled to values between 0 and 1 if scale_score is set to
# True (default). Scaling uses the expit function (inverse of the logit function) after applying a SCALING_FACTOR. A
Expand Down Expand Up @@ -126,17 +126,17 @@ def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Doc
return [doc for doc in self.storage.values() if match(conditions=filters, document=doc)]
return list(self.storage.values())

def write_documents(self, documents: List[Document], duplicates: DuplicatePolicy = "fail") -> None:
def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> None:
"""
Writes (or overwrites) documents into the store.

:param documents: a list of documents.
:param duplicates: documents with the same ID count as duplicates. When duplicates are met,
:param policy: documents with the same ID count as duplicates. When duplicates are met,
the store can:
- skip: keep the existing document and ignore the new one.
- overwrite: remove the old document and write the new one.
- fail: an error is raised
:raises DuplicateError: Exception trigger on duplicate document if `duplicates="fail"`
:raises DuplicateError: Exception trigger on duplicate document if `policy=DuplicatePolicy.FAIL`
:return: None
"""
if (
Expand All @@ -147,10 +147,10 @@ def write_documents(self, documents: List[Document], duplicates: DuplicatePolicy
raise ValueError("Please provide a list of Documents.")

for document in documents:
if document.id in self.storage.keys():
if duplicates == "fail":
if policy != DuplicatePolicy.OVERWRITE and document.id in self.storage.keys():
if policy == DuplicatePolicy.FAIL:
raise DuplicateDocumentError(f"ID '{document.id}' already exists.")
if duplicates == "skip":
if policy == DuplicatePolicy.SKIP:
logger.warning("ID '%s' already exists", document.id)
self.storage[document.id] = document

Expand Down
67 changes: 67 additions & 0 deletions haystack/preview/document_stores/mixins.py
ZanSara marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Protocol, Dict, Optional

from haystack.preview.document_stores.protocols import Store


class StoreComponent(Protocol):
"""
Represents a component that needs a single store to run.
"""

_store: Store

ZanSara marked this conversation as resolved.
Show resolved Hide resolved
@property
def store(self) -> Optional[Store]:
...

@store.setter
def store(self, store: Store):
...


class StoreMixin(StoreComponent):
"""
Adds the capability of a component to use a single document store from the `self.store` property.
"""

@property
def store(self) -> Optional[Store]:
return getattr(self, "_store", None)
ZanSara marked this conversation as resolved.
Show resolved Hide resolved

@store.setter
def store(self, store: Store):
if not store:
raise ValueError("Can't set the value of the store to None.")
ZanSara marked this conversation as resolved.
Show resolved Hide resolved
self._store = store


class MultiStoreComponent:
"""
Represents a component that needs more than a single store to run.
"""

@property
def stores(self) -> Optional[Dict[str, Store]]:
...

@stores.setter
def stores(self, stores: Dict[str, Store]):
...


class MultiStoreMixin(MultiStoreComponent):
"""
Adds the capability of a component to use several document stores from the `self.stores` property.
"""

_stores: Dict[str, Store]

@property
def stores(self) -> Optional[Dict[str, Store]]:
return getattr(self, "_stores", None)

@stores.setter
def stores(self, stores: Dict[str, Store]):
if stores is None:
raise ValueError("The stores dictionary can't be None.")
self._stores = stores
126 changes: 126 additions & 0 deletions haystack/preview/document_stores/protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Protocol, Optional, Dict, Any, List

import logging
from enum import Enum

from haystack.preview.dataclasses import Document


logger = logging.getLogger(__name__)


class DuplicatePolicy(Enum):
SKIP = "skip"
OVERWRITE = "overwrite"
FAIL = "fail"


class Store(Protocol):
"""
Stores Documents to be used by the components of a Pipeline.

Classes implementing this protocol often store the documents permanently and allow specialized components to
perform retrieval on them, either by embedding, by keyword, hybrid, and so on, depending on the backend used.

In order to retrieve documents, consider using a Retriever that supports the document store implementation that
you're using.
"""

def count_documents(self) -> int:
"""
Returns the number of documents stored.
"""

def filter_documents(self, filters: Optional[Dict[str, Any]] = None) -> List[Document]:
"""
Returns the documents that match the filters provided.

Filters are defined as nested dictionaries. The keys of the dictionaries can be a logical operator (`"$and"`,
`"$or"`, `"$not"`), a comparison operator (`"$eq"`, `$ne`, `"$in"`, `$nin`, `"$gt"`, `"$gte"`, `"$lt"`,
`"$lte"`) or a metadata field name.

Logical operator keys take a dictionary of metadata field names and/or logical operators as value. Metadata
field names take a dictionary of comparison operators as value. Comparison operator keys take a single value or
(in case of `"$in"`) a list of values as value. If no logical operator is provided, `"$and"` is used as default
operation. If no comparison operator is provided, `"$eq"` (or `"$in"` if the comparison value is a list) is used
as default operation.

Example:

```python
filters = {
"$and": {
"type": {"$eq": "article"},
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": {"$in": ["economy", "politics"]},
"publisher": {"$eq": "nytimes"}
}
}
}
# or simpler using default operators
filters = {
"type": "article",
"date": {"$gte": "2015-01-01", "$lt": "2021-01-01"},
"rating": {"$gte": 3},
"$or": {
"genre": ["economy", "politics"],
"publisher": "nytimes"
}
}
```

To use the same logical operator multiple times on the same level, logical operators can take a list of
dictionaries as value.

Example:

```python
filters = {
"$or": [
{
"$and": {
"Type": "News Paper",
"Date": {
"$lt": "2019-01-01"
}
}
},
{
"$and": {
"Type": "Blog Post",
"Date": {
"$gte": "2019-01-01"
}
}
}
]
}
```

:param filters: the filters to apply to the document list.
:return: a list of Documents that match the given filters.
"""

def write_documents(self, documents: List[Document], policy: DuplicatePolicy = DuplicatePolicy.FAIL) -> None:
"""
Writes (or overwrites) documents into the store.

:param documents: a list of documents.
:param policy: documents with the same ID count as duplicates. When duplicates are met,
the store can:
- skip: keep the existing document and ignore the new one.
- overwrite: remove the old document and write the new one.
- fail: an error is raised
:raises DuplicateError: Exception trigger on duplicate document if `policy=DuplicatePolicy.FAIL`
:return: None
"""

def delete_documents(self, document_ids: List[str]) -> None:
"""
Deletes all documents with a matching document_ids from the document store.
Fails with `MissingDocumentError` if no document with this id is present in the store.

:param object_ids: the object_ids to delete
"""
Loading