diff --git a/haystack/preview/document_stores/mixins.py b/haystack/preview/document_stores/mixins.py index 7a17026104..e6a642479d 100644 --- a/haystack/preview/document_stores/mixins.py +++ b/haystack/preview/document_stores/mixins.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Type +from typing import List, Type from haystack.preview.document_stores.protocols import Store @@ -6,28 +6,9 @@ class StoreAwareMixin: """ - Adds the capability of a component to use a single document store from the `self.store` property. - + Signals to the pipeline that this component will need a store added to their input data. To use this mixin you must specify which document stores to support by setting a value to `supported_stores`. To support any document store, set it to `[Store]`. """ - _store: Optional[Store] = None supported_stores: List[Type[Store]] # type: ignore # (see https://github.com/python/mypy/issues/4717) - - @property - def store(self) -> Optional[Store]: - return self._store - - @store.setter - def store(self, store: Store): - if not store: - raise ValueError("Can't set the value of the store to None.") - if not isinstance(store, Store): - raise ValueError("'store' does not respect the Store Protocol.") - if not any(isinstance(store, type_) for type_ in type(self).supported_stores): - raise ValueError( - f"Store type '{type(store).__name__}' is not compatible with this component. " - f"Compatible store types: {[type_.__name__ for type_ in type(self).supported_stores]}" - ) - self._store = store diff --git a/haystack/preview/pipeline.py b/haystack/preview/pipeline.py index 41dc0b5779..4e1f540241 100644 --- a/haystack/preview/pipeline.py +++ b/haystack/preview/pipeline.py @@ -2,6 +2,7 @@ from pathlib import Path +from canals.component import ComponentInput from canals.pipeline import ( Pipeline as CanalsPipeline, PipelineError, @@ -29,6 +30,7 @@ class Pipeline(CanalsPipeline): def __init__(self): super().__init__() self._stores: Dict[str, Store] = {} + self._store_connections: Dict[str, str] = {} def add_store(self, name: str, store: Store) -> None: """ @@ -94,13 +96,42 @@ def add_component(self, name: str, instance: Any, store: Optional[str] = None) - f"Store named '{store}' not found. " f"Add it with 'pipeline.add_store('{store}', )'." ) - instance.store = self._stores[store] + + if not any(isinstance(self._stores[store], type_) for type_ in type(instance).supported_stores): + raise ValueError( + f"Store type '{type(self._stores[store]).__name__}' is not compatible with this component. " + f"Compatible store types: {[type_.__name__ for type_ in type(instance).supported_stores]}" + ) + + self._store_connections[name] = store elif store: raise ValueError(f"Component '{name}' doesn't support stores.") super().add_component(name, instance) + def run(self, data: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: + """ + Runs the pipeline. + + Args: + data: the inputs to give to the input components of the Pipeline. + debug: whether to collect and return debug information. + + Returns: + A dictionary with the outputs of the Pipeline. + + Raises: + PipelineRuntimeError: if the any of the components fail or return unexpected output. + """ + # Assigns the docstore to the input dataclasses of each component that requested it + for component_name, store_name in self._store_connections: + if not component_name in data: + data[component_name] = self.get_component(component_name).input() + data[component_name].store = self._stores[store_name] + + super().run(data=data, debug=debug) + def load_pipelines(path: Path, _reader: Optional[Callable[..., Any]] = None): return load_canals_pipelines(path=path, _reader=_reader)