Skip to content

Commit

Permalink
move .store to input dataclass
Browse files Browse the repository at this point in the history
  • Loading branch information
ZanSara committed Jul 13, 2023
1 parent 6055455 commit 53f624b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
23 changes: 2 additions & 21 deletions haystack/preview/document_stores/mixins.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,14 @@
from typing import List, Optional, Type
from typing import List, Type


from haystack.preview.document_stores.protocols import Store


class StoreAwareMixin:
"""
Adds the capability of a component to use a single document store from the `self.store` property.
Signals to the pipeline that this component will need a store added to their input data.
To use this mixin you must specify which document stores to support by setting a value to `supported_stores`.
To support any document store, set it to `[Store]`.
"""

_store: Optional[Store] = None
supported_stores: List[Type[Store]] # type: ignore # (see https://github.com/python/mypy/issues/4717)

@property
def store(self) -> Optional[Store]:
return self._store

@store.setter
def store(self, store: Store):
if not store:
raise ValueError("Can't set the value of the store to None.")
if not isinstance(store, Store):
raise ValueError("'store' does not respect the Store Protocol.")
if not any(isinstance(store, type_) for type_ in type(self).supported_stores):
raise ValueError(
f"Store type '{type(store).__name__}' is not compatible with this component. "
f"Compatible store types: {[type_.__name__ for type_ in type(self).supported_stores]}"
)
self._store = store
33 changes: 32 additions & 1 deletion haystack/preview/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pathlib import Path

from canals.component import ComponentInput
from canals.pipeline import (
Pipeline as CanalsPipeline,
PipelineError,
Expand Down Expand Up @@ -29,6 +30,7 @@ class Pipeline(CanalsPipeline):
def __init__(self):
super().__init__()
self._stores: Dict[str, Store] = {}
self._store_connections: Dict[str, str] = {}

def add_store(self, name: str, store: Store) -> None:
"""
Expand Down Expand Up @@ -94,13 +96,42 @@ def add_component(self, name: str, instance: Any, store: Optional[str] = None) -
f"Store named '{store}' not found. "
f"Add it with 'pipeline.add_store('{store}', <the docstore instance>)'."
)
instance.store = self._stores[store]

if not any(isinstance(self._stores[store], type_) for type_ in type(instance).supported_stores):
raise ValueError(
f"Store type '{type(self._stores[store]).__name__}' is not compatible with this component. "
f"Compatible store types: {[type_.__name__ for type_ in type(instance).supported_stores]}"
)

self._store_connections[name] = store

elif store:
raise ValueError(f"Component '{name}' doesn't support stores.")

super().add_component(name, instance)

def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""
Runs the pipeline.
Args:
data: the inputs to give to the input components of the Pipeline.
debug: whether to collect and return debug information.
Returns:
A dictionary with the outputs of the Pipeline.
Raises:
PipelineRuntimeError: if the any of the components fail or return unexpected output.
"""
# Assigns the docstore to the input dataclasses of each component that requested it
for component_name, store_name in self._store_connections:
if not component_name in data:
data[component_name] = self.get_component(component_name).input()
data[component_name].store = self._stores[store_name]

super().run(data=data, debug=debug)


def load_pipelines(path: Path, _reader: Optional[Callable[..., Any]] = None):
return load_canals_pipelines(path=path, _reader=_reader)
Expand Down

0 comments on commit 53f624b

Please sign in to comment.