diff --git a/.gitignore b/.gitignore index e84e336..a67f4fb 100644 --- a/.gitignore +++ b/.gitignore @@ -158,4 +158,6 @@ kedro.db .vscode/ -.ruff_cache/ \ No newline at end of file +.ruff_cache/ + +notebooks/ \ No newline at end of file diff --git a/kedro_boot/app/__init__.py b/kedro_boot/app/__init__.py new file mode 100644 index 0000000..7912da5 --- /dev/null +++ b/kedro_boot/app/__init__.py @@ -0,0 +1 @@ +from .app import AbstractKedroBootApp, CompileApp # noqa: F401 diff --git a/kedro_boot/app.py b/kedro_boot/app/app.py similarity index 65% rename from kedro_boot/app.py rename to kedro_boot/app/app.py index 1a86139..428c3c8 100644 --- a/kedro_boot/app.py +++ b/kedro_boot/app/app.py @@ -1,27 +1,33 @@ """``AbstractKedroBootApp`` is the base class for all kedro boot app implementations. """ from abc import ABC, abstractmethod -from typing import Any +from typing import Any, List -from kedro.config import OmegaConfigLoader +from kedro.config import ConfigLoader from kedro.io import DataCatalog from pluggy import PluginManager -from kedro_boot.pipeline import AppPipeline -from kedro_boot.session import KedroBootSession +from kedro.pipeline.pipeline import Pipeline +from kedro_boot.framework.session import KedroBootSession +from kedro_boot.framework.compiler.specs import CompilationSpec class AbstractKedroBootApp(ABC): """``AbstractKedroBootApp`` is the base class for all kedro boot app implementations""" + LAZY_COMPILE = False + + def __init__(self, compilation_specs: List[CompilationSpec] = None) -> None: + self._compilation_specs = compilation_specs + def run( self, - pipeline: AppPipeline, + pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, session_id: str, - config_loader: OmegaConfigLoader, - lazy_compile: bool, + runtime_app_params: dict, + config_loader: ConfigLoader, ) -> Any: """Create a ``KedroBootSession`` then run the kedro boot app @@ -30,21 +36,26 @@ def run( catalog: The base ``DataCatalog`` from which to fetch data. hook_manager: The ``PluginManager`` to activate hooks. session_id: The id of the kedro session. + runtime_app_params (dict): params given by an App specific CLI + config_loader (OmegaConfigLoader): kedro ``OmegaConfigLoader`` object Returns: Any: the return value of the kedro boot app run method """ + config_loader.update({"application": ["application*/"]}) + session = KedroBootSession( pipeline=pipeline, catalog=catalog, hook_manager=hook_manager, session_id=session_id, + runtime_app_params=runtime_app_params, config_loader=config_loader, ) - if not lazy_compile: - session.compile_catalog() + if not self.LAZY_COMPILE: + session.compile(compilation_specs=self._compilation_specs) return self._run(session) @@ -54,7 +65,7 @@ def _run(self, session: KedroBootSession) -> Any: ``KedrobootSession`` have already be created by run(). Args: - session (KedroBootSession): is the object that is responsible for managing the kedro boot app lifecycle + session (KedroBootSession): A user facing interface that expose kedro's resource to the kedro boot apps """ pass @@ -71,7 +82,7 @@ def _run(self, session: KedroBootSession) -> Any: return session.run() -class DryRunApp(AbstractKedroBootApp): +class CompileApp(AbstractKedroBootApp): """An App used to perform Dry Run. Args: @@ -80,14 +91,3 @@ class DryRunApp(AbstractKedroBootApp): def _run(self, session: KedroBootSession) -> Any: pass - - -class BridgeApp(AbstractKedroBootApp): - """An App used by the booter to pass the instantiated kedro boot session. - - Args: - AbstractKedroBootApp (_type_): _description_ - """ - - def _run(self, session: KedroBootSession) -> KedroBootSession: - return session diff --git a/kedro_boot/booter/__init__.py b/kedro_boot/booter/__init__.py deleted file mode 100644 index e0f800f..0000000 --- a/kedro_boot/booter/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .boot import boot_session # noqa: F401 -from .cli import commands, kedro_boot_cli_factory # noqa: F401 diff --git a/kedro_boot/booter/boot.py b/kedro_boot/booter/boot.py deleted file mode 100644 index f323386..0000000 --- a/kedro_boot/booter/boot.py +++ /dev/null @@ -1,62 +0,0 @@ -"""A factory funtion for creating kedro boot session within external apps""" - -import copy -from pathlib import Path -from typing import Optional - -from kedro.framework.session import KedroSession -from kedro.framework.startup import bootstrap_project - -from kedro_boot.runner import KedroBootRunner -from kedro_boot.session import KedroBootSession - - -def boot_session( - project_path: Optional[Path] = None, - pipeline_name: Optional[str] = None, - env: Optional[str] = None, - extra_params: Optional[dict] = None, - lazy_compile: Optional[bool] = False, - **session_run_kwargs, -) -> KedroBootSession: - """Create ``KedroBootSession`` from kedro project. ``KedroBootSession`` is used by the kedro boot app to perform multiple low latency runs of the pipeline with the possibility of injecting data at each iteration. - - Args: - project_path (Path): Kedro project path. Defaults to the current working directory. - pipeline_name (str): Name of the project running pipeline. Default to __Default__ - env (str): Kedro project env. Defaults to base. - extra_params (dict): Inject extra params to kedro project. Defaults to None. - lazy_compile (bool): AppCatalog compilation mode. By default kedro boot autmatically compile before starting the app. If lazy mode activated, the compilation processed need to be triggered by the app or lazily at first iteration run. Defaults to False. - session_run_kwargs (dict): KedroSession.run kwargs. Defaults to None. - - Returns: - KedroBootSession: _description_ - """ - - # TODO: Add support for packaged kedro projects - - boot_project_path = Path(project_path or Path.cwd()).resolve() - - project_metadata = bootstrap_project(boot_project_path) - kedro_session = KedroSession.create( - project_metadata.package_name, - project_path=boot_project_path, - env=env, - extra_params=extra_params, - ) - - boot_session_args = copy.deepcopy(session_run_kwargs) - if "pipeline_name" in boot_session_args: - boot_session_args.pop("pipeline_name") - if "runner" in boot_session_args: - boot_session_args.pop("runner") - - return kedro_session.run( - pipeline_name=pipeline_name, - runner=KedroBootRunner( - config_loader=kedro_session._get_config_loader(), - app_class="kedro_boot.app.BridgeApp", - lazy_compile=lazy_compile, - ), - **boot_session_args, - ) # type: ignore diff --git a/kedro_boot/booter/cli.py b/kedro_boot/booter/cli.py deleted file mode 100644 index 420da6b..0000000 --- a/kedro_boot/booter/cli.py +++ /dev/null @@ -1,181 +0,0 @@ -"""A CLI factory for kedro boot apps""" - -import logging -from typing import Callable, List, Optional - -import click -from click import Command -from click.decorators import FC -from kedro.framework.cli.project import run as kedro_run_command -from kedro.framework.cli.utils import _get_values_as_tuple -from kedro.framework.session import KedroSession - -from kedro_boot.runner import KedroBootRunner - -LOGGER = logging.getLogger(__name__) - - -def kedro_boot_cli_factory( - app_class: Optional[str] = None, - app_params: Optional[List[Callable[[FC], FC]]] = None, - lazy_compile: Optional[bool] = None, -) -> Command: - """A factory function that create a new command with all the existing kedro run options plus given app specific options. - The command instantiate a kedro boot runner with app_class and app_args before running the kedro session. - - Args: - app_class (str): Kedro Boot App class. ex: my_package.my_module.my_app_class - app_params (List[Option]): Kedro Boot App click options. These options will be added to kedro run options - - Returns: - Command: _description_ - """ - - app_params = app_params or [] - - # If no app_class given. We'll add an app Option in the Command and use DummyApp as default - if not app_class: - for app_param in app_params: - if app_param.name == "app": - LOGGER.warning( - "No app_class was given and at the same time 'app' Option is found in your kedro boot cli factory app_params. We're going to replace your app Option by kedro boot app Option. So you can have a way to provide an app_class" - ) - app_params.remove(app_param) - break - - app_params.append( - click.option( - "--app", - type=str, - default="kedro_boot.app.DummyApp", - help="Kedro Boot App Class. ex: my_package.my_module.my_app_class", - ) - ) - - app_params.append( - click.option( - "--dry-run", - is_flag=True, - help="Compile the App catalog with the given project settings without running the pipeline. If used the --app and --lazy-compile options are ignored.", - ) - ) - - app_params.append( - click.option( - "--lazy-compile", - is_flag=True, - help="Compile the catalog at a specific point of the execution flow of the application. This let the app decide when to compile the catalog. This is useful in multiprocessing apps (like Gunicorn or Spark) because some datasets may not support process forking when loaded at the master process. If the app does not specify the compilation point, it would be done lazily at the first iteration run", - ) - ) - - else: - for app_param in app_params: - if app_param.name == "lazy_compile" and lazy_compile is not None: - LOGGER.warning( - "You have given lazy_compile options in two places, your app option params, and as argument of the cli factory. The cli factory parameter will take precedence" - ) - - @click.command() - def kedro_boot_command(**kwargs): - """Running kedro boot apps""" - - ctx = click.get_current_context() - kedro_run_params = [param.name for param in kedro_run_command.params] - app_args = { - arg_key: arg_value - for arg_key, arg_value in ctx.params.items() - if arg_key not in kedro_run_params - } - kedro_args = { - arg_key: arg_value - for arg_key, arg_value in kwargs.items() - if arg_key not in app_args - } - - # pop the app_class from app_args - boot_app_class = app_class or app_args.pop("app") - app_args_lazy_compile = ( - app_args.pop("lazy_compile") if "lazy_compile" in app_args else False - ) - boot_app_lazy_compile = ( - lazy_compile if lazy_compile is not None else app_args_lazy_compile - ) - dry_run = app_args.pop("dry_run") if "dry_run" in app_args else False - if dry_run: - boot_app_class = "kedro_boot.app.DryRunApp" - - tag = _get_values_as_tuple(kedro_args["tag"]) - node_names = _get_values_as_tuple(kedro_args["node_names"]) - - # temporary duplicates for the plural flags - tags = _get_values_as_tuple(kedro_args.get("tags", [])) - nodes_names = _get_values_as_tuple(kedro_args.get("nodes_names", [])) - - tag = tag + tags - node_names = node_names + nodes_names - load_version = { - **kedro_args["load_version"], - **kedro_args.get("load_versions", {}), - } - - # best effort for retrocompatibility with kedro < 0.18.5 - namespace_kwargs = ( - {"namespace": kedro_args.get("namespace")} - if kedro_args.get("namespace") - else {} - ) - conf_source_kwargs = ( - {"conf_source": kedro_args.get("conf_source")} - if kedro_args.get("conf_source") - else {} - ) - - with KedroSession.create( - env=kedro_args["env"], - extra_params=kedro_args["params"], - **conf_source_kwargs, # type: ignore - ) as session: - runner_args = {"is_async": kedro_args["is_async"]} - config_loader = session._get_config_loader() - config_loader._register_new_resolvers( - { - "itertime_params": lambda variable, - default_value=None: f"${{oc.select:{variable},{default_value}}}", - } - ) - runner = KedroBootRunner( - config_loader=config_loader, - app_class=boot_app_class, - app_args=app_args, - lazy_compile=boot_app_lazy_compile, - **runner_args, - ) - session.run( - tags=tag, - runner=runner, - node_names=node_names, - from_nodes=kedro_args["from_nodes"], - to_nodes=kedro_args["to_nodes"], - from_inputs=kedro_args["from_inputs"], - to_outputs=kedro_args["to_outputs"], - load_versions=load_version, - pipeline_name=kedro_args["pipeline"], - **namespace_kwargs, # type: ignore - ) - - kedro_boot_command.params.extend(kedro_run_command.params) - for param in app_params: - kedro_boot_command = param(kedro_boot_command) - - return kedro_boot_command - - -kedro_boot_command = kedro_boot_cli_factory() - - -@click.group(name="boot") -def commands(): - pass - - -commands.add_command(kedro_boot_command, "boot") diff --git a/kedro_boot/catalog/__init__.py b/kedro_boot/catalog/__init__.py deleted file mode 100644 index c48a2d4..0000000 --- a/kedro_boot/catalog/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .catalog import AppCatalog # noqa: F401 diff --git a/kedro_boot/catalog/catalog.py b/kedro_boot/catalog/catalog.py deleted file mode 100644 index 3c2b711..0000000 --- a/kedro_boot/catalog/catalog.py +++ /dev/null @@ -1,217 +0,0 @@ -""""``AppCatalog`` make the the kedro catalog ready for interaction with the kedro boot app.""" -import copy -import logging -from typing import List, Optional - -from kedro.io import DataCatalog - -from kedro_boot.pipeline import AppPipeline - -from .compiler import compile_with_all_pipeline_outputs, compile_with_pipeline_inputs -from .renderer import ( - render_datasets, - render_input_datasets, - render_parameter_datasets, - render_template_datasets, -) -from .view import CatalogView - -LOGGER = logging.getLogger(__name__) - - -class AppCatalog: - """ "``AppCatalog`` Manage the kedro catalog lifecycle. - At compilation time : It build a ``CatalogView`` for each ``PipelineView`` . - At iteration time: catalog views are rendered/merged with data provided by the app - """ - - def __init__(self, catalog: DataCatalog) -> None: - """Init the ``AppCatalog`` with the base kedro catalog. - - Args: - catalog (DataCatalog): Kedro Catalog - """ - self.catalog = catalog - self.catalog_views = [] - - def compile(self, app_pipeline: AppPipeline) -> None: - """Build catalog view for each pipeline view. - - Args: - app_pipeline (AppPipeline): Base pipeline with views defining datasets's categories and compilation options - """ - - for pipeline_view in app_pipeline.views: - pipeline = app_pipeline.only_nodes_with_tags(pipeline_view.name) - - catalog_view = CatalogView(name=pipeline_view.name) - - pipeline_inputs = { - dataset_name: self.catalog._get_dataset(dataset_name) - for dataset_name in pipeline.inputs() - } - compile_with_pipeline_inputs( - catalog_view=catalog_view, - pipeline_inputs=pipeline_inputs, - pipeline_view=pipeline_view, - ) - - all_pipeline_outputs = { - dataset_name: self.catalog._get_dataset(dataset_name) - for dataset_name in pipeline.all_outputs() - } - compile_with_all_pipeline_outputs( - catalog_view=catalog_view, - all_pipeline_outputs=all_pipeline_outputs, - pipeline_view=pipeline_view, - ) - - self.catalog_views.append(catalog_view) - - LOGGER.info( - "catalog compilation completed for the pipeline view '%s'. Here is the report:\n" - " - Input datasets to be replaced/rendered at iteration time: %s\n" - " - Output datasets that hold the results of a run at iteration time: %s \n" - " - Parameter datasets to be replaced/rendered at iteration time: %s\n" - " - Artifact datasets to be materialized (preloader as memory dataset) at startup time: %s\n" - " - Template datasets to be rendered at iteration time: %s\n", - catalog_view.name, - set(catalog_view.inputs), - set(catalog_view.outputs), - set(catalog_view.parameters), - set(catalog_view.artifacts), - set(catalog_view.templates), - ) - - LOGGER.info("Catalog compilation completed.") - - def render( - self, - name: str, - inputs: Optional[dict] = None, - parameters: Optional[dict] = None, - template_params: Optional[dict] = None, - ) -> DataCatalog: - """Generate a catalog for a specific view by rendering the catalog view using app data. - - Args: - name (str): catalog view name - inputs (dict): inputs data given by the app - parameters (dict): parameters data given by the app - template_params (dict): template_params data given by the app - - Returns: - DataCatalog: The rendered catalog - """ - - catalog_view = self.get_catalog_view(name) - - inputs = inputs or {} - parameters = parameters or {} - template_params = template_params or {} - - # namespacing inputs and parameters if needed to hopefully match the catalog. Users can give data without specifying namespace - # TODO : we should add/infer namespace attribute to pipeline/catalog view, and consistently use it to namespace datasets. This actual namespacing is just a best effort - namespaced_inputs = namespacing_datasets(inputs, catalog_view.inputs) - namespaced_parameters = namespacing_datasets( - parameters, catalog_view.parameters - ) - - rendered_catalog = DataCatalog() - - # Render each part of the catalog view - input_datasets = render_input_datasets( - catalog_view_inputs=catalog_view.inputs, iteration_inputs=namespaced_inputs - ) - artifact_datasets = catalog_view.artifacts - template_datasets = render_template_datasets( - catalog_view_templates=catalog_view.templates, - iteration_template_params=template_params, - ) - parameter_datasets = render_parameter_datasets( - catalog_view_parameters=catalog_view.parameters, - iteration_parameters=namespaced_parameters, - ) - output_datasets = render_datasets(datasets=catalog_view.outputs) - unmanaged_datasets = render_datasets(datasets=catalog_view.unmanaged) - - rendered_catalog.add_all( - { - **input_datasets, - **output_datasets, - **parameter_datasets, - **template_datasets, - **artifact_datasets, - **unmanaged_datasets, - } - ) - - return rendered_catalog - - def get_catalog_view(self, name: str) -> CatalogView: - catalog_view = next( - ( - catalog_view - for catalog_view in self.catalog_views - if catalog_view.name == name - ), - None, - ) - - if not catalog_view: - raise AppCatalogError( - f"The given catalog view '{name}' is not present in the catalog." - ) - - return catalog_view - - def get_view_names(self) -> List[str]: - return [catalog_view.name for catalog_view in self.catalog_views] - - -def namespacing_datasets(iteration_datasets, catalog_dataset_names) -> dict: - """namespacing the given app datasets if needed to hopefully match the catalog. - Users can give data without specifying namespaces - - Args: - iteration_datasets (_type_): datasets given by the ap at iteration time - catalog_dataset_names (_type_): catalog dataset names - - Returns: - dict: namespaced_itretaion_datasets - """ - - # Create a shallow copy as we'll potentially alterate some keys - namespaced_iteration_datasets = copy.copy(iteration_datasets) - - # get remaining iteration datasets that are not in the catalog - remaining_iteration_datasets = set(namespaced_iteration_datasets) - set( - catalog_dataset_names - ) - - # Trying to namespace the remaining datasets to hopefully match the catalog - if remaining_iteration_datasets: - # Getting potentially namespaced datasets from the catalog - only_namespaced_dataset_names = { - catalog_dataset_name.split(".")[1]: catalog_dataset_name.split(".")[0] - for catalog_dataset_name in catalog_dataset_names - if len(catalog_dataset_name.split(".")) > 1 - } - - for dataset_name in remaining_iteration_datasets: - if dataset_name in only_namespaced_dataset_names: - namespace = only_namespaced_dataset_names[dataset_name].replace( - "params:", "" - ) - LOGGER.info(f"Trying to namespace {dataset_name} with {namespace}") - namespaced_dataset_name = f"{namespace}.{dataset_name}" - # Replace dataset_name with namepace.dataset_name - namespaced_iteration_datasets[ - namespaced_dataset_name - ] = namespaced_iteration_datasets.pop(dataset_name) - - return namespaced_iteration_datasets - - -class AppCatalogError(Exception): - """Error raised in AppCatalog operations""" diff --git a/kedro_boot/catalog/compiler.py b/kedro_boot/catalog/compiler.py deleted file mode 100644 index 199499d..0000000 --- a/kedro_boot/catalog/compiler.py +++ /dev/null @@ -1,185 +0,0 @@ -"""Helper functions for compiling catalog_views's datasets""" - -import logging -from pathlib import PurePath -from typing import Any, Dict, Union -from omegaconf import OmegaConf - -from kedro.io import MemoryDataSet - -from kedro_boot.pipeline.pipeline import PipelineView - -from .view import CatalogView - -LOGGER = logging.getLogger(__name__) - - -def compile_with_pipeline_inputs( - catalog_view: CatalogView, - pipeline_inputs: Dict[ - str, Any - ], # Any is AbstractDataSet, for retrocompatibility reasons we don't specify the type as it was renamed lately to AbstractDataset - pipeline_view: PipelineView, -) -> None: - """Build catalog views's datasets using pipeline view dataset categories and pipeline inputs. - Each dataset category (inputs, parameters, templates, artifacts, unmanaged) could be filled through this process. - - Args: - catalog_view (CatalogView): object to be Build through the compilation process - pipeline_inputs (dict): pipeline inputs as the datasets to be used for building the catalog_view - pipeline_view (PipelineView): Contains the dataset names that belong to each category, plus some compilation options (infer artifacts, infer templates) - """ - for dataset_name, dataset_value in pipeline_inputs.items(): - # inputs - if dataset_name in pipeline_view.inputs: - catalog_view.inputs[dataset_name] = dataset_value - - # parameters - elif _check_if_dataset_is_param( - dataset_name=dataset_name, pipeline_view_parameters=pipeline_view.parameters - ): - catalog_view.parameters[dataset_name] = dataset_value - - # templates - elif recursively_check_dataset_parametrized_values(dataset_value): - catalog_view.templates[dataset_name] = dataset_value - - # artifacts - elif dataset_name in pipeline_view.artifacts: - LOGGER.info(f"Loading {dataset_name} dataset as a MemoryDataset") - catalog_view.artifacts[dataset_name] = MemoryDataSet( - dataset_value.load(), copy_mode="assign" - ) - - elif pipeline_view.infer_artifacts: - if pipeline_view.inputs: - LOGGER.info( - f"Infered artifact dataset {dataset_name}. Loading it as a MemoryDataset" - ) - catalog_view.artifacts[dataset_name] = MemoryDataSet( - dataset_value.load(), copy_mode="assign" - ) - else: - catalog_view.unmanaged[dataset_name] = dataset_value - - # Others - else: - catalog_view.unmanaged[dataset_name] = dataset_value - - -def compile_with_all_pipeline_outputs( - catalog_view: CatalogView, - all_pipeline_outputs: Dict[ - str, Any - ], # Any is AbstractDataSet, for retrocompatibility reasons we don't specify the type as it was renamed lately to AbstractDataset - pipeline_view: PipelineView, -) -> None: - """Build catalog views's datasets using pipeline view dataset categories and all pipeline outputs. - Each dataset category (inputs, parameters, templates, artifacts, unmanaged) could be filled through this process. - - Args: - catalog_view (CatalogView): object to be Build through the compilation process - all_pipeline_outputs (dict): all the pipeline outputs as the datasets to be used for building the catalog_view - pipeline_view (PipelineView): Contains the dataset names that belong to each category, plus some compilation options (infer artifacts, infer templates) - """ - - for dataset_name, dataset_value in all_pipeline_outputs.items(): - is_template_dataset = recursively_check_dataset_parametrized_values( - dataset_value - ) - - if dataset_name in pipeline_view.outputs: - if dataset_value.__class__.__name__.lower() != "memorydataset": - LOGGER.warning( - f"This pipeline output '{dataset_name}' will cost you an I/O operation, please consider freeing it (making it a MemoryDataSet)." - ) - - catalog_view.outputs[dataset_name] = dataset_value - - if is_template_dataset: - catalog_view.templates[dataset_name] = dataset_value - - else: - if ( - dataset_value.__class__.__name__.lower() != "memorydataset" - and pipeline_view.outputs - ): - LOGGER.warning( - f"This pipeline output '{dataset_name}' will cost you an I/O operation without being used by current app, please consider freeing it. the pipeline outputs that are needed by the current pipeline view ({catalog_view.name}) are : {pipeline_view.outputs}" - ) - if is_template_dataset: - catalog_view.templates[dataset_name] = dataset_value - else: - catalog_view.unmanaged[dataset_name] = dataset_value - - -def recursively_check_parametrized_values( - dataset_attributes: Union[str, list, dict, PurePath] -) -> bool: # noqa: PLR0911 - """Helper that check if any of the dataset attributes is parametrized (contains ${oc.select:param_name,default_value}) - - Args: - dataset (Any): Any kedro dataset - - Returns: - bool: _description_ - """ - if isinstance(dataset_attributes, str): - config = OmegaConf.create({"dataset_entry": dataset_attributes}) - return OmegaConf.is_interpolation(config, "dataset_entry") - # return bool(re.search(r"\[\[.*?\]\]", dataset_attributes)) - - elif isinstance(dataset_attributes, PurePath): - config = OmegaConf.create({"dataset_entry": str(dataset_attributes)}) - return OmegaConf.is_interpolation(config, "dataset_entry") - - elif isinstance(dataset_attributes, dict): - for key in dataset_attributes: - if recursively_check_parametrized_values(dataset_attributes[key]): - return True - return False - - elif isinstance(dataset_attributes, list): - for i in range(len(dataset_attributes)): - if recursively_check_parametrized_values(dataset_attributes[i]): - return True - return False - - else: - return False - - -def recursively_check_dataset_parametrized_values(dataset: Any) -> bool: - """Helper that check if any of the dataset attributes is parametrized (contains [[...]]) - - Args: - dataset (Any): Any kedro dataset - - Returns: - bool: _description_ - """ - for value in dataset.__dict__.values(): - if recursively_check_parametrized_values(value): - return True - return False - - -def _check_if_dataset_is_param(dataset_name, pipeline_view_parameters) -> bool: - """Helper that extract the dataset name from the params:... or parameters:... string - - Args: - dataset_name (str): dataset name - - Returns: - str: dataset name without params: or parameters: prefix - """ - if dataset_name == "parameters": - return True - elif "params:" in dataset_name: - return dataset_name.split("params:")[1] in pipeline_view_parameters - else: - return False - - -class CatalogCompilerError(Exception): - """Error raised in AppCatalog compilation process""" diff --git a/kedro_boot/catalog/view.py b/kedro_boot/catalog/view.py deleted file mode 100644 index 9fab7e1..0000000 --- a/kedro_boot/catalog/view.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Optional - - -class CatalogView: - """a view of the catalog that map an app pipeline's view""" - - def __init__( - self, - name: Optional[str] = None, - inputs: Optional[dict] = None, - outputs: Optional[dict] = None, - parameters: Optional[dict] = None, - artifacts: Optional[dict] = None, - templates: Optional[dict] = None, - unmanaged: Optional[dict] = None, - ) -> None: - """Init ``CatalogView`` with a name and a categorisation of the datasets that are exposed to the app. - - Args: - name (str): A view name. It should be unique per AppCatalog views. - inputs dict: Inputs datasets that will be injected by the app data at iteration time. - outputs dict: Outputs dataset that hold the iteration run results. - parameters dict: Parameters that will be injected by the app data at iteration time. - artifacts dict: Artifacts datasets that will be materialized (loaded as MemoryDataset) at startup time. - templates dict: Templates datasets that contains jinja expressions in this form [[ expressions ]]. - unmanaged dict: Datasets that will not undergo any operation and keeped as is - """ - self.name = name - - self.inputs = inputs or {} - self.outputs = outputs or {} - self.artifacts = artifacts or {} - self.templates = templates or {} - self.parameters = parameters or {} - self.unmanaged = unmanaged or {} diff --git a/kedro_boot/framework/__init__.py b/kedro_boot/framework/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kedro_boot/runner.py b/kedro_boot/framework/adapter.py similarity index 61% rename from kedro_boot/runner.py rename to kedro_boot/framework/adapter.py index d8021e3..ceed95b 100644 --- a/kedro_boot/runner.py +++ b/kedro_boot/framework/adapter.py @@ -1,53 +1,41 @@ -"""``KedroBootRunner`` is the kedro runner that instantiate and run the kedro boot app.""" +"""``KedroBootAdapter`` transform a Kedro Session Run to a booting process.""" import logging from typing import Any, Optional, Union -from kedro.config import OmegaConfigLoader +from kedro.config import ConfigLoader from kedro.framework.hooks.manager import _NullPluginManager from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline from kedro.runner import AbstractRunner -from kedro.utils import load_obj from pluggy import PluginManager from kedro_boot.app import AbstractKedroBootApp -from kedro_boot.pipeline import DEFAULT_PIPELINE_VIEW_NAME, AppPipeline, app_pipeline LOGGER = logging.getLogger(__name__) -class KedroBootRunner(AbstractRunner): - """``KedroBootRunner`` is the kedro runner that instantiate and run the kedro boot app.""" +class KedroBootAdapter(AbstractRunner): + """``KedroBootRunner`` transform a Kedro Session Run to a booting process.""" def __init__( self, - config_loader: OmegaConfigLoader, - app_class: str, - app_args: Optional[dict] = None, - lazy_compile: bool = False, - **runner_args, + app: AbstractKedroBootApp, + config_loader: ConfigLoader, + app_run_args: Optional[dict] = None, ): - """Instantiate the kedro boot runner + """Instantiate the kedro boot adapter Args: + app (AbstractKedroBootApp): Kedro Boot App object config_loader (OmegaConfigLoader): kedro config loader - app_class (str): Path to the kedro boot app object. ex: kedro_boot.app.KedroBootApp - app_args (dict): Args used for initializing kedro boot app object. Defaults to None. + app_run_args (dict): App runtime args given by App CLI """ - super().__init__(**runner_args) + super().__init__() - self.config_loader = config_loader - - app_args = app_args or {} - obj_class = load_obj(app_class) - if not issubclass(obj_class, AbstractKedroBootApp): - raise TypeError( - f"app_class must be a subclass of AbstractKedroBootApp, got {obj_class.__name__}" - ) - self.app_obj = obj_class(**app_args) if app_args else obj_class() - - self.lazy_compile = lazy_compile + self._app = app + self._config_loader = config_loader + self._app_run_args = app_run_args or {} def run( self, @@ -65,11 +53,11 @@ def run( session_id: The id of the session. """ - if not isinstance(pipeline, AppPipeline): - LOGGER.warning( - "No AppPipeline was given. We gonna create a '__default__' one from the given pipeline" - ) - pipeline = app_pipeline(pipeline, name=DEFAULT_PIPELINE_VIEW_NAME) + # if not isinstance(pipeline, AppPipeline): + # LOGGER.warning( + # "No AppPipeline was given. We gonna create a '__default__' one from the given pipeline" + # ) + # pipeline = app_pipeline(pipeline, name=DEFAULT_PIPELINE_VIEW_NAME) hook_manager = hook_manager or _NullPluginManager() catalog = catalog.shallow_copy() @@ -105,10 +93,10 @@ def run( catalog, hook_manager, session_id, - self.config_loader, - self.lazy_compile, + self._app_run_args, + self._config_loader, ) - self._logger.info(f"{self.app_obj.__class__.__name__} execution completed.") + self._logger.info(f"{self._app.__class__.__name__} execution completed.") return app_return def _run(self, *args) -> Any: @@ -118,7 +106,7 @@ def _run(self, *args) -> Any: Returns: Any: Any object returned at the end of execution of the app """ - return self.app_obj.run(*args) + return self._app.run(*args) def create_default_data_set(self, ds_name: str): return MemoryDataSet() diff --git a/kedro_boot/framework/cli/__init__.py b/kedro_boot/framework/cli/__init__.py new file mode 100644 index 0000000..534acf3 --- /dev/null +++ b/kedro_boot/framework/cli/__init__.py @@ -0,0 +1,2 @@ +# from .cli import commands # noqa: F401 +from .factory import kedro_boot_command_factory # noqa: F401 diff --git a/kedro_boot/framework/cli/cli.py b/kedro_boot/framework/cli/cli.py new file mode 100644 index 0000000..4b39911 --- /dev/null +++ b/kedro_boot/framework/cli/cli.py @@ -0,0 +1,54 @@ +"""A CLI factory for kedro boot apps""" +import click +import logging +from pathlib import Path +from kedro.framework.startup import _is_project +from .factory import kedro_boot_command_factory + +LOGGER = logging.getLogger(__name__) + + +run_command = kedro_boot_command_factory( + command_name="run", command_help="Run kedro boot apps" +) +compile_command = kedro_boot_command_factory( + command_name="compile", + command_help="Compile the catalog (Dryrun)", + app_class="kedro_boot.app.CompileApp", +) + +# Get entry points commands early to prevent getting them repeatedly inside KedroClickGroup + + +class KedroClickGroup(click.Group): + def reset_commands(self): + self.commands = {} + + # add commands on the fly based on conditions + if _is_project(Path.cwd()): + self.add_command(run_command) + self.add_command(compile_command) + + # else: + # self.add_command(new) # TODO : IMPLEMENT THIS FUNCTION + + def list_commands(self, ctx): + self.reset_commands() + commands_list = sorted(self.commands) + return commands_list + + def get_command(self, ctx, cmd_name): + self.reset_commands() + return self.commands.get(cmd_name) + + +@click.group(name="Boot") +def commands(): + """Kedro plugin for integrating any application with kedro.""" + pass # pragma: no cover + + +@commands.command(name="boot", cls=KedroClickGroup) +def boot_commands(): + """kedro boot specific commands inside kedro project.""" + pass # pragma: no cover diff --git a/kedro_boot/framework/cli/factory.py b/kedro_boot/framework/cli/factory.py new file mode 100644 index 0000000..63d32aa --- /dev/null +++ b/kedro_boot/framework/cli/factory.py @@ -0,0 +1,177 @@ +"""A CLI factory for kedro boot apps""" + +import logging +from typing import Any, Callable, List, Optional, Union + +import click +from click import Command +from click.decorators import FC +from kedro.framework.cli.project import run as kedro_run_command +from kedro.framework.cli.utils import _get_values_as_tuple +from kedro.framework.session import KedroSession +from kedro.utils import load_obj +from kedro_boot.app.app import AbstractKedroBootApp +from kedro_boot.framework.adapter import KedroBootAdapter + +LOGGER = logging.getLogger(__name__) + + +def create_kedro_booter( + app_class: Optional[str] = None, + app_args: Optional[dict] = None, + kedro_session_create_args: Optional[dict] = None, +): + kedro_session_create_args = kedro_session_create_args or {} + app_args = app_args or {} + + def kedro_booter(**kwargs): + """Running kedro boot apps""" + + # ctx = click.get_current_context() + kedro_run_params = [ + param.name.replace("-", "_") for param in kedro_run_command.params + ] # format params in a python supported format. Ex: replace - by _ + app_run_args = { + arg_key: arg_value + for arg_key, arg_value in kwargs.items() + if arg_key not in kedro_run_params + } + kedro_args = { + arg_key: arg_value + for arg_key, arg_value in kwargs.items() + if arg_key not in app_run_args + } + + # Init kedro boot app object + app = None + if app_class: + app = app_factory(app_class, app_args) + else: + cli_app_class = app_run_args.pop("app") + if cli_app_class: + LOGGER.info("We're gettings app class from --app CLI arg") + app = app_factory(cli_app_class) + + # Format some kedro args + tag = _get_values_as_tuple(kedro_args.get("tag", "")) + node_names = _get_values_as_tuple(kedro_args.get("node_names", "")) + + tags = _get_values_as_tuple(kedro_args.get("tags", "")) + nodes_names = _get_values_as_tuple(kedro_args.get("nodes_names", "")) + + tag = tag + tags + node_names = node_names + nodes_names + + load_version = { + **kedro_args.get("load_version", {}), + **kedro_args.get("load_versions", {}), + } + + with KedroSession.create( + env=kedro_args.get("env", ""), + extra_params=kedro_args.get("params", ""), + conf_source=kedro_args.get("conf_source", ""), # type: ignore + **kedro_session_create_args, # TODO: Make sure that this not take precedence over kedro_args. We should do some prior merging before kwarging + ) as session: + config_loader = session._get_config_loader() + config_loader._register_new_resolvers( + { + "itertime_params": lambda variable, + default_value=None: f"${{oc.select:{variable},{default_value}}}", + } + ) + if app: + runner = KedroBootAdapter( + app=app, + config_loader=config_loader, + app_run_args=app_run_args, + ) + else: + runner_obj = load_obj( + kedro_args.get("runner", "") or "SequentialRunner", "kedro.runner" + ) + runner = runner_obj(is_async=kedro_args.get("is_async", "")) + + return session.run( + tags=tag, + runner=runner, + node_names=node_names, + from_nodes=kedro_args.get("from_nodes", ""), + to_nodes=kedro_args.get("to_nodes", ""), + from_inputs=kedro_args.get("from_inputs", ""), + to_outputs=kedro_args.get("to_outputs", ""), + load_versions=load_version, + pipeline_name=kedro_args.get("pipeline", ""), + namespace=kedro_args.get("namespace", ""), + ) + + return kedro_booter + + +def kedro_boot_command_factory( + command_name: str = None, + command_help: str = None, + command_params: Optional[List[Callable[[FC], FC]]] = None, + app_class: Optional[str] = None, + app_args: Optional[dict] = None, + kedro_session_create_args: Optional[dict] = None, +) -> Command: + command_name = command_name or "app" + command_params = command_params or [] + + kedro_session_create_args = kedro_session_create_args or {} + app_args = app_args or {} + + # If no app_class given. We'll add an app Option in the Command and use DummyApp as default + if not app_class: + for commad_param in command_params: + if commad_param.name == "app": + LOGGER.warning( + "No app_class was given and at the same time 'app' Option is found in your kedro boot cli factory command_params. We're going to replace your app Option by kedro boot app Option. So you can have a way to provide an app_class" + ) + command_params.remove(commad_param) + break + + command_params.append( + click.option( + "--app", + type=str, + default="", + help="Kedro Boot App Class. ex: my_package.my_module.my_app_class", + ) + ) + + kedro_booter = create_kedro_booter( + app_class=app_class, + app_args=app_args, + kedro_session_create_args=kedro_session_create_args, + ) + + @click.command(name=command_name, short_help=command_help) + def kedro_boot_command(**kwargs) -> Any: + return kedro_booter(**kwargs) + + kedro_boot_command.params.extend( + kedro_run_command.params + ) # TODO: We should check for collisions between app params and kedro params + for param in command_params: + kedro_boot_command = param(kedro_boot_command) + + return kedro_boot_command + + +def app_factory( + app_class: Union[str, AbstractKedroBootApp], app_args: dict = None +) -> AbstractKedroBootApp: + app_args = app_args or {} + if isinstance(app_class, str): + app_class_obj = load_obj(app_class) + else: + app_class_obj = app_class + + if not issubclass(app_class_obj, AbstractKedroBootApp): + raise TypeError( + f"app_class must be a subclass of AbstractKedroBootApp, got {app_class_obj.__name__}" + ) + + return app_class_obj(**app_args) if app_args else app_class_obj() diff --git a/kedro_boot/framework/compiler/__init__.py b/kedro_boot/framework/compiler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kedro_boot/framework/compiler/compiler.py b/kedro_boot/framework/compiler/compiler.py new file mode 100644 index 0000000..ea873c3 --- /dev/null +++ b/kedro_boot/framework/compiler/compiler.py @@ -0,0 +1,210 @@ +"""Helper functions for compiling kedro catalog""" + +import logging +from pathlib import PurePath +from typing import Any, Dict, Optional, Union +from omegaconf import OmegaConf + +from .specs import CompilationSpec + +LOGGER = logging.getLogger(__name__) + + +class CatalogAssembly: + """``CatalogAssembly`` is the resulting object of the catalog compilation process""" + + def __init__( + self, + inputs: Optional[dict] = None, + outputs: Optional[dict] = None, + parameters: Optional[dict] = None, + artifacts: Optional[dict] = None, + templates: Optional[dict] = None, + unmanaged: Optional[dict] = None, + ) -> None: + """Init ``CatalogView`` with a name and a categorisation of the datasets that are exposed to the app. + + Args: + inputs dict: Inputs datasets that will be injected by the app data at iteration time. + outputs dict: Outputs dataset that hold the iteration run results. + parameters dict: Parameters that will be injected by the app data at iteration time. + artifacts dict: Artifacts datasets that will be materialized (loaded as MemoryDataset) at startup time. + templates dict: Templates datasets that contains jinja expressions in this form [[ expressions ]]. + unmanaged dict: Datasets that will not undergo any operation and keeped as is + """ + + self.inputs = inputs or {} + self.outputs = outputs or {} + self.artifacts = artifacts or {} + self.templates = templates or {} + self.parameters = parameters or {} + self.unmanaged = unmanaged or {} + + def __add__(self, catalog_assembly): + return CatalogAssembly( + inputs={**self.inputs, **catalog_assembly.inputs}, + outputs={**self.outputs, **catalog_assembly.outputs}, + artifacts={**self.artifacts, **catalog_assembly.artifacts}, + parameters={**self.parameters, **catalog_assembly.parameters}, + templates={**self.templates, **catalog_assembly.templates}, + unmanaged={**self.unmanaged, **catalog_assembly.unmanaged}, + ) + + +def compile_with_pipeline_inputs( + pipeline_inputs: Dict[ + str, Any + ], # Any is AbstractDataSet, for retrocompatibility reasons we don't specify the type as it was renamed lately to AbstractDataset + compilation_spec: CompilationSpec, +) -> CatalogAssembly: + """Compile a CatalogAssembly using pipeline's inputs datasets and following the compilation specs. + + Args: + pipeline_inputs (dict): pipeline's inputs datasets + compilation_spec (CompilationSpec): + """ + + catalog_assembly = CatalogAssembly() + + for dataset_name, dataset_value in pipeline_inputs.items(): + # inputs + if dataset_name in compilation_spec.namespaced_inputs: + catalog_assembly.inputs[dataset_name] = dataset_value + + # parameters + elif _check_if_dataset_is_param( + dataset_name=dataset_name, + specs_parameters=compilation_spec.namespaced_parameters, + ): + catalog_assembly.parameters[dataset_name] = dataset_value + + # templates + elif recursively_check_dataset_parametrized_values(dataset_value): + catalog_assembly.templates[dataset_name] = dataset_value + + elif compilation_spec.infer_artifacts: + catalog_assembly.artifacts[dataset_name] = dataset_value + # Others + else: + catalog_assembly.unmanaged[dataset_name] = dataset_value + + return catalog_assembly + + +def compile_with_all_pipeline_outputs( + all_pipeline_outputs: Dict[ + str, Any + ], # Any is AbstractDataSet, for retrocompatibility reasons we don't specify the type as it was renamed lately to AbstractDataset + compilation_spec: CompilationSpec, +) -> CatalogAssembly: + """Compile a CatalogAssembly using all pipeline's outputs datasets and following the compilation specs. + + Args: + pipeline_inputs (dict): pipeline's inputs datasets + compilation_spec (CompilationSpec): + """ + + catalog_assembly = CatalogAssembly() + + for dataset_name, dataset_value in all_pipeline_outputs.items(): + is_template_dataset = recursively_check_dataset_parametrized_values( + dataset_value + ) + + if dataset_name in compilation_spec.namespaced_outputs: + if dataset_value.__class__.__name__.lower() != "memorydataset": + LOGGER.warning( + f"This pipeline output '{dataset_name}' will cost you an I/O operation, please consider freeing it (making it a MemoryDataSet)." + ) + + catalog_assembly.outputs[dataset_name] = dataset_value + + if is_template_dataset: + catalog_assembly.templates[dataset_name] = dataset_value + + else: + if ( + dataset_value.__class__.__name__.lower() != "memorydataset" + and compilation_spec.namespaced_outputs + ): + LOGGER.warning( + f"This pipeline output '{dataset_name}' will cost you an I/O operation without being used by current app, please consider freeing it. the pipeline outputs that are needed by the current pipeline namespace ({compilation_spec.namespace}) are : {compilation_spec.namespaced_outputs}" + ) + if is_template_dataset: + catalog_assembly.templates[dataset_name] = dataset_value + else: + catalog_assembly.unmanaged[dataset_name] = dataset_value + + return catalog_assembly + + +def recursively_check_parametrized_values( + dataset_attributes: Union[str, list, dict, PurePath] +) -> bool: # noqa: PLR0911 + """Helper that check if any of the dataset attributes is parametrized + + Args: + dataset (Any): Any kedro dataset + + Returns: + bool: _description_ + """ + if isinstance(dataset_attributes, str): + config = OmegaConf.create({"dataset_entry": dataset_attributes}) + return OmegaConf.is_interpolation(config, "dataset_entry") + # return bool(re.search(r"\[\[.*?\]\]", dataset_attributes)) + + elif isinstance(dataset_attributes, PurePath): + config = OmegaConf.create({"dataset_entry": str(dataset_attributes)}) + return OmegaConf.is_interpolation(config, "dataset_entry") + + elif isinstance(dataset_attributes, dict): + for key in dataset_attributes: + if recursively_check_parametrized_values(dataset_attributes[key]): + return True + return False + + elif isinstance(dataset_attributes, list): + for i in range(len(dataset_attributes)): + if recursively_check_parametrized_values(dataset_attributes[i]): + return True + return False + + else: + return False + + +def recursively_check_dataset_parametrized_values(dataset: Any) -> bool: + """Helper that check if any of the dataset attributes is parametrized + + Args: + dataset (Any): Any kedro dataset + + Returns: + bool: _description_ + """ + for value in dataset.__dict__.values(): + if recursively_check_parametrized_values(value): + return True + return False + + +def _check_if_dataset_is_param(dataset_name, specs_parameters) -> bool: + """Helper that extract the dataset name from the params:... or parameters:... string + + Args: + dataset_name (str): dataset name + + Returns: + str: dataset name without params: or parameters: prefix + """ + if dataset_name == "parameters": + return True + elif "params:" in dataset_name: + return dataset_name.split("params:")[1] in specs_parameters + else: + return False + + +class CatalogCompilerError(Exception): + """Error raised in AppCatalog compilation process""" diff --git a/kedro_boot/framework/compiler/specs.py b/kedro_boot/framework/compiler/specs.py new file mode 100644 index 0000000..a8a131e --- /dev/null +++ b/kedro_boot/framework/compiler/specs.py @@ -0,0 +1,164 @@ +"""This module implements CompilationSpec and the logic of namespacing and dataset's naming.""" + +from typing import List +from kedro.pipeline import Pipeline + + +class CompilationSpec: + """``NamespaceSpec`` is a user facing interface that encapsulate catalog compilation spec's attributes and utilities""" + + def __init__( + self, + namespace: str = None, + inputs: List[str] = None, + outputs: List[str] = None, + parameters: List[str] = None, + infer_artifacts: bool = True, + ) -> None: + """Init the ``CompilationSpec``. + + Args: + namespace (str): pipeline's namespace + inputs (List[str]): inputs datasets to be exposed to the App. Specify it without the namespace prefix + namespace (List[str]): outputs datasets to be exposed to the App. Specify it without the namespace prefix + namespace (List[str]): parameters datasets to be exposed to the App. Specify it without the namespace prefix + infer_artifacts (bool): Wheter if the compiler infer artifacts datasets. Default to True + """ + self._namespace = namespace + infer_artifacts = infer_artifacts if infer_artifacts is not None else True + self._spec = dict( + inputs=inputs or [], + outputs=outputs or [], + parameters=parameters or [], + infer_artifacts=infer_artifacts, + ) + + @property + def namespace(self) -> str: + return self._namespace + + @namespace.setter + def namespace(self, value: str) -> None: + self._namespace = value + + @property + def inputs(self) -> List[str]: + return self._spec["inputs"] + + @inputs.setter + def inputs(self, value: List[str]) -> None: + self._spec["inputs"] = value + + @property + def namespaced_inputs(self) -> List[str]: + return namespace_datasets_names(self.inputs, self._namespace) + + @property + def outputs(self) -> List[str]: + return self._spec["outputs"] + + @property + def namespaced_outputs(self) -> List[str]: + return namespace_datasets_names(self.outputs, self._namespace) + + @outputs.setter + def outputs(self, value: List[str]) -> None: + self._spec["outputs"] = value + + @property + def parameters(self) -> List[str]: + return self._spec["parameters"] + + @property + def namespaced_parameters(self) -> List[str]: + return namespace_datasets_names(self.parameters, self._namespace) + + @property + def prefixed_namespaced_parameters(self) -> List[str]: + return [f"params:{param}" for param in self.namespaced_parameters] + + @parameters.setter + def parameters(self, value: List[str]) -> None: + self._spec["parameters"] = value + + @property + def infer_artifacts(self) -> bool: + return self._spec["infer_artifacts"] + + @infer_artifacts.setter + def infer_artifacts(self, value: bool) -> None: + self._spec["infer_artifacts"] = value + + def to_dict(self) -> dict: + return dict(namespace=self._namespace, specs=self._specs) + + @classmethod + def infer_compilation_specs(cls, pipeline: Pipeline): + """Infer Compilation specs from a pipeline. + + Args: + pipeline (Pipeline): kedro pipeline + Returns: + List[CompilationSpec]: compilation specs + """ + + namespaces = set() + for node in pipeline.nodes: + namespaces.add(node.namespace) + + compilation_specs = [] + + for namespace in namespaces: + compilation_spec = CompilationSpec(namespace=namespace) + namespace_pipeline = filter_pipeline(pipeline, namespace) + for dataset_name in namespace_pipeline.inputs(): + if not namespace and "params:" in dataset_name: + compilation_spec.parameters.append( + dataset_name.replace("params:", "") + ) + elif f"params:{namespace}." in dataset_name: + compilation_spec.parameters.append( + dataset_name.replace(f"params:{namespace}.", "") + ) + elif f"{namespace}." in dataset_name: + compilation_spec.inputs.append( + dataset_name.replace(f"{namespace}.", "") + ) + + for dataset_name in namespace_pipeline.outputs(): + if f"{namespace}." in dataset_name: + compilation_spec.outputs.append( + dataset_name.replace(f"{namespace}.", "") + ) + + print("HEEEERERERE") + print(compilation_spec.parameters) + compilation_specs.append(compilation_spec) + + return compilation_specs + + +def namespace_datasets_names(datasets_names: list, namespace: str) -> List[str]: + namespaced_datasets_names = [] + for dataset_name in datasets_names: + namespaced_dataset_name = namespace_dataset_name(dataset_name, namespace) + namespaced_datasets_names.append(namespaced_dataset_name) + + return namespaced_datasets_names + + +def namespace_dataset_name(dataset_name: str, namespace: str) -> str: + if namespace: + return f"{namespace}.{dataset_name}" + + return dataset_name + + +def filter_pipeline(pipeline: Pipeline, namespace: str) -> Pipeline: + # Replace the pipeline.only_nodes_with_namespaces as it does not take None namespace. + nodes = [] + for n in pipeline.nodes: + if str(n.namespace) == str(namespace): + nodes.append(n) + + return Pipeline(nodes) diff --git a/kedro_boot/framework/context/__init__.py b/kedro_boot/framework/context/__init__.py new file mode 100644 index 0000000..ca4ef3b --- /dev/null +++ b/kedro_boot/framework/context/__init__.py @@ -0,0 +1 @@ +from .context import KedroBootContext # noqa: F401 diff --git a/kedro_boot/framework/context/context.py b/kedro_boot/framework/context/context.py new file mode 100644 index 0000000..7e129ee --- /dev/null +++ b/kedro_boot/framework/context/context.py @@ -0,0 +1,276 @@ +""""``KedroBootContext`` provides context for the kedro boot project.""" +import logging +from typing import List, Optional, Tuple + +from kedro.io import DataCatalog + +from kedro.pipeline.pipeline import Pipeline +from kedro.io import MemoryDataSet +from kedro_boot.utils import find_duplicates + +from kedro_boot.framework.compiler.compiler import ( + compile_with_all_pipeline_outputs, + compile_with_pipeline_inputs, +) +from kedro_boot.framework.renderer.renderer import ( + render_datasets, + render_input_datasets, + render_parameter_datasets, + render_template_datasets, +) +from kedro_boot.framework.compiler.specs import ( + CompilationSpec, + filter_pipeline, + namespace_dataset_name, +) + +LOGGER = logging.getLogger(__name__) + + +class KedroBootContext: + """ "``KedroBootContext`` Manage and hold main Kedro Boot objects.""" + + def __init__(self, pipeline: Pipeline, catalog: DataCatalog) -> None: + """Init the ``KedroBootContext`` with the base kedro pipeline and catalog. + + Args: + catalog (DataCatalog): Kedro Catalog + """ + self.pipeline = pipeline + self.catalog = catalog + + self._namespaces_registry = {} + + def compile(self, compilation_specs: List[CompilationSpec] = None) -> None: + """Prepare kedro's resources for iteration time by creating a namespace registry indexed by namespaces that contains the corresponding pipelines and catalogs pré-materialized and organized by dataset categories according to their relevance to the application + + Args: + compilation_specs (List[CompilationSpec]): Compilation Specs provided by the App. compilation_specs are infered from the pipeline if no compilation_specs provided. + """ + + infered_compilation_specs = CompilationSpec.infer_compilation_specs( + self.pipeline + ) + + if compilation_specs: + # check if the given namespaces specs have duplicate namespaces + namespaces = [ + compilation_spec.namespace for compilation_spec in compilation_specs + ] + duplicated_namespaces = find_duplicates(namespaces) + if duplicated_namespaces: + raise ValueError( + f"Cannot compile the catalog with these duplicated namespace names: '{duplicated_namespaces}'" + ) + + # check if the given namespaces inline with the pipeline namespaces + given_namespaces = set(namespaces) + infered_namespaces = set( + [ + compilation_spec.namespace + for compilation_spec in infered_compilation_specs + ] + ) + + if given_namespaces != infered_namespaces: + LOGGER.warning( + f"The given namespaces does not match with the pipeline namespaces. pipeline namespace are {infered_namespaces}, and the given namespaces are {given_namespaces}" + ) + + else: + LOGGER.info( + "No namespace specs provided by the app, we gonna infer them from pipeline namespaces" + ) + compilation_specs = infered_compilation_specs + + for compilation_spec in compilation_specs: + pipeline = filter_pipeline( + pipeline=self.pipeline, namespace=compilation_spec.namespace + ) + + pipeline_inputs = { + dataset_name: self.catalog._get_dataset(dataset_name) + for dataset_name in pipeline.inputs() + } + + remaining_inputs_specs = set(compilation_spec.namespaced_inputs) - set( + pipeline_inputs.keys() + ) + if remaining_inputs_specs: + raise KedroBootContextError( + f"These inputs datasets {remaining_inputs_specs} given for {compilation_spec.namespace} spec, does not exists in pipeline inputs." + ) + + remaining_parameters_specs = set( + compilation_spec.prefixed_namespaced_parameters + ) - set(pipeline_inputs.keys()) + if remaining_parameters_specs: + raise KedroBootContextError( + f"These parameters datasets {remaining_parameters_specs} given in {compilation_spec.namespace} namespace specs, does not exists in pipeline parameters." + ) + + catalog_assembly_with_inputs = compile_with_pipeline_inputs( + pipeline_inputs=pipeline_inputs, + compilation_spec=compilation_spec, + ) + + all_pipeline_outputs = { + dataset_name: self.catalog._get_dataset(dataset_name) + for dataset_name in pipeline.all_outputs() + } + + remaining_outputs_specs = set(compilation_spec.namespaced_outputs) - set( + all_pipeline_outputs.keys() + ) + if remaining_outputs_specs: + raise KedroBootContextError( + f"These outputs datasets {remaining_outputs_specs} given for {compilation_spec.namespace} spec, does not exists in pipeline outputs." + ) + + catalog_assembly_with_all_outputs = compile_with_all_pipeline_outputs( + all_pipeline_outputs=all_pipeline_outputs, + compilation_spec=compilation_spec, + ) + + catalog_assembly = ( + catalog_assembly_with_inputs + catalog_assembly_with_all_outputs + ) + + LOGGER.info( + "catalog compilation completed for the namespace '%s'. Here is the report:\n" + " - Input datasets to be replaced/rendered at iteration time: %s\n" + " - Output datasets that hold the results of a run at iteration time: %s \n" + " - Parameter datasets to be replaced/rendered at iteration time: %s\n" + " - Artifact datasets to be materialized (preloader as memory dataset) at compile time: %s\n" + " - Template datasets to be rendered at iteration time: %s\n", + compilation_spec.namespace, + set(catalog_assembly.inputs), + set(catalog_assembly.outputs), + set(catalog_assembly.parameters), + set(catalog_assembly.artifacts), + set(catalog_assembly.templates), + ) + + self._namespaces_registry[compilation_spec.namespace] = dict( + pipeline=pipeline, + catalog=catalog_assembly, + outputs=compilation_spec.namespaced_outputs, + ) + + LOGGER.info("Loading artifacts datasets as MemoryDataset ...") + self.materialize_artifacts() + + LOGGER.info("Catalog compilation completed.") + + def materialize_artifacts(self): + # Materialize the artifact dataset by loading them as memory in the compilation process. Here we merge all the artifact datasets (cross namespaces) so we can load once an artifact that is shared between two dataset in two different namespaces + all_artifacts_datasets = {} + # Merge all artifact datasets cross namespaces/specs + for namespace in self._namespaces_registry.values(): + all_artifacts_datasets.update(namespace["catalog"].artifacts) + + # Materialized thoses artifact datasets in all_materialized_artifact_datasets + all_materialized_artifact_datasets = {} + for dataset_name, dataset_value in all_artifacts_datasets.items(): + LOGGER.info(f"Loading {dataset_name} as a MemoryDataset") + all_materialized_artifact_datasets[ + dataset_name + ] = MemoryDataSet( # Add Logging fro this operation + dataset_value.load(), copy_mode="assign" + ) + + # Assign the materialized artifact back to namespace/spec artifact datasets + for namespace in self._namespaces_registry.values(): + for dataset_name in namespace["catalog"].artifacts: + namespace["catalog"].artifacts[ + dataset_name + ] = all_materialized_artifact_datasets[dataset_name] + + def render( + self, + namespace: str = None, + inputs: Optional[dict] = None, + parameters: Optional[dict] = None, + itertime_params: Optional[dict] = None, + ) -> Tuple[Pipeline, DataCatalog]: + """Generate a (pipeline, catalog) by rendering a namespace registry using the provided App Data. + + Args: + namespace (str): pipeline's namespace. + inputs (dict): App inputs datasets that will be injected into the catalog. + parameters (dict): App parameters datasets that will be injected into the catalog. + itertime_params (dict): App itertime params that will resolve the itertime_params resolvers. + + Returns: + Pipeline, DataCatalog: The rendered catalog + """ + + inputs = inputs or {} + parameters = parameters or {} + itertime_params = itertime_params or {} + + namespaced_inputs = namespace_datasets(inputs, namespace) + namespaced_parameters = namespace_datasets(parameters, namespace) + + catalog_assembly = self._namespaces_registry.get(namespace).get("catalog") + + rendered_catalog = DataCatalog() + + # Render each part of the catalog view + input_datasets = render_input_datasets( + catalog_inputs=catalog_assembly.inputs, iteration_inputs=namespaced_inputs + ) + artifact_datasets = catalog_assembly.artifacts + template_datasets = render_template_datasets( + catalog_templates=catalog_assembly.templates, + iteration_template_params=itertime_params, + ) + parameter_datasets = render_parameter_datasets( + catalog_parameters=catalog_assembly.parameters, + iteration_parameters=namespaced_parameters, + ) + output_datasets = render_datasets(datasets=catalog_assembly.outputs) + unmanaged_datasets = render_datasets(datasets=catalog_assembly.unmanaged) + + rendered_catalog.add_all( + { + **input_datasets, + **output_datasets, + **parameter_datasets, + **template_datasets, + **artifact_datasets, + **unmanaged_datasets, + } + ) + + pipeline = self._namespaces_registry.get(namespace).get("pipeline") + + print(self._namespaces_registry.get("n1")) + return pipeline, rendered_catalog + + def get_outputs_datasets(self, namespace: str) -> List[str]: + return self._namespaces_registry.get(namespace).get("outputs") + + +def namespace_datasets(iteration_datasets: dict, namespace: str = None) -> dict: + """namespacing the given app datasets + Args: + iteration_datasets (dict): datasets given by the ap at iteration time + namespace (str): pipeline's namespace + + Returns: + dict: namespaced_itretaion_datasets + """ + + namespaced_datasets = {} + + for dataset_name, dataset_value in iteration_datasets.items(): + namespaced_dataset_name = namespace_dataset_name(dataset_name, namespace) + + namespaced_datasets[namespaced_dataset_name] = dataset_value + + return namespaced_datasets + + +class KedroBootContextError(Exception): + """Error raised in AppCatalog operations""" diff --git a/kedro_boot/framework/renderer/__init__.py b/kedro_boot/framework/renderer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kedro_boot/catalog/renderer.py b/kedro_boot/framework/renderer/renderer.py similarity index 74% rename from kedro_boot/catalog/renderer.py rename to kedro_boot/framework/renderer/renderer.py index aae7945..229e75a 100644 --- a/kedro_boot/catalog/renderer.py +++ b/kedro_boot/framework/renderer/renderer.py @@ -1,4 +1,4 @@ -"""Helper functions for rendering catalog views datasets using iteration datasets""" +"""Helper functions for rendering catalogs using iteration datasets""" import copy import logging @@ -20,15 +20,15 @@ def render_datasets(datasets: Dict[str, Any]) -> Dict[str, Any]: # type: ignore def render_template_datasets( - catalog_view_templates: Dict[str, Any], iteration_template_params: dict + catalog_templates: Dict[str, Any], iteration_template_params: dict ) -> Dict[str, Any]: # type: ignore - template_params = recursively_get_dataset_template_params(catalog_view_templates) + template_params = recursively_get_dataset_template_params(catalog_templates) remaining_catalog_tempate_params = set(template_params) - set( iteration_template_params ) if remaining_catalog_tempate_params: LOGGER.warning( - f"The is not enough given iteration template param to render all the Template expressions. Template expressions are {set(template_params)} and the actual given template params are {set(iteration_template_params)}. Default values will be used for {remaining_catalog_tempate_params}" + f"The is not enough given iteration template param to render all the Template expressions. Template expressions are {set(template_params)} and the actual given template params are {set(iteration_template_params)}. Default values will be used if given in jinja expression '[[ expression | default('value') ]]' for {remaining_catalog_tempate_params}" ) iteration_template_params_without_run_id = set(iteration_template_params) - { @@ -43,7 +43,7 @@ def render_template_datasets( ) rendered_datasets = {} - for dataset_name, dataset_value in catalog_view_templates.items(): + for dataset_name, dataset_value in catalog_templates.items(): rendered_dataset_value = copy.deepcopy(dataset_value) recursively_render_parametrized_dataset_template( rendered_dataset_value, iteration_template_params @@ -54,86 +54,85 @@ def render_template_datasets( def render_parameter_datasets( - catalog_view_parameters: Dict[str, Any], iteration_parameters: dict + catalog_parameters: Dict[str, Any], iteration_parameters: dict ) -> Dict[str, Any]: # type: ignore formatted_iteration_params = { f"params:{param_name}": param_value for param_name, param_value in iteration_parameters.items() } - if "parameters" in catalog_view_parameters and iteration_parameters: + if "parameters" in catalog_parameters and iteration_parameters: formatted_iteration_params.update({"parameters": iteration_parameters}) - remaining_catalog_params = set(catalog_view_parameters) - set( - formatted_iteration_params - ) + remaining_catalog_params = set(catalog_parameters) - set(formatted_iteration_params) if remaining_catalog_params: - raise CatalogRendererError( - f"There is not enough given iteration parameters to render all the catalog parameters. Exposed Catalog parameters are {set(catalog_view_parameters)} and the actual given iteration parameters are {formatted_iteration_params}. {remaining_catalog_params} cannot be rendered" + LOGGER.warning( + f"There is not enough given iteration parameters to render all the catalog parameters. Exposed Catalog parameters are {set(catalog_parameters)} and the actual given iteration parameters are {formatted_iteration_params}. {remaining_catalog_params} cannot be rendered" ) # check remaining iteration parameters in case of having only parameters dataset - if "parameters" in catalog_view_parameters and len(catalog_view_parameters) == 1: - catalog_view_parameters_values = catalog_view_parameters.get( - "parameters" - ).load() + if "parameters" in catalog_parameters and len(catalog_parameters) == 1: + catalog_parameters_values = catalog_parameters.get("parameters").load() iteration_parameters_values = formatted_iteration_params.get("parameters", {}) remaining_iteration_params = set(iteration_parameters_values) - set( - catalog_view_parameters_values + catalog_parameters_values ) if remaining_iteration_params: LOGGER.warning( - f"There is remainig iteration parameters that are not used for rendering catalog parameters. Catalog parameters are {catalog_view_parameters_values} and the actual given iteration parameters are {iteration_parameters_values}. {remaining_iteration_params} are remaining unused" + f"There is remainig iteration parameters that are not used for rendering catalog parameters. Catalog parameters are {catalog_parameters_values} and the actual given iteration parameters are {iteration_parameters_values}. {remaining_iteration_params} are remaining unused" ) else: remaining_iteration_params = set(formatted_iteration_params) - set( - catalog_view_parameters + catalog_parameters ) if remaining_iteration_params: LOGGER.warning( - f"There is remainig iteration parameters that are not used for rendering catalog parameters. Catalog parameters are {set(catalog_view_parameters)} and the actual given iteration parameters are {formatted_iteration_params}. {remaining_iteration_params} are remaining unused" + f"There is remainig iteration parameters that are not used for rendering catalog parameters. Catalog parameters are {set(catalog_parameters)} and the actual given iteration parameters are {formatted_iteration_params}. {remaining_iteration_params} are remaining unused" ) rendered_datasets = {} - for dataset_name, dataset_value in catalog_view_parameters.items(): - rendered_dataset_value = copy.deepcopy(dataset_value) + for dataset_name, dataset_value in catalog_parameters.items(): + if formatted_iteration_params.get(dataset_name): + rendered_dataset_value = copy.deepcopy(dataset_value) - parameters = rendered_dataset_value.load() + parameters = rendered_dataset_value.load() - if isinstance(parameters, dict): - rendered_datasets[dataset_name] = MemoryDataSet( - _recursive_dict_update( - parameters, formatted_iteration_params.get(dataset_name) + if isinstance(parameters, dict): + rendered_datasets[dataset_name] = MemoryDataSet( + _recursive_dict_update( + parameters, formatted_iteration_params.get(dataset_name) + ) + ) + else: + rendered_datasets[dataset_name] = MemoryDataSet( + formatted_iteration_params[dataset_name] ) - ) else: - rendered_datasets[dataset_name] = MemoryDataSet( - formatted_iteration_params[dataset_name] - ) + rendered_datasets[dataset_name] = dataset_value return rendered_datasets def render_input_datasets( - catalog_view_inputs: Dict[str, Any], iteration_inputs: dict + catalog_inputs: Dict[str, Any], iteration_inputs: dict ) -> Dict[str, Any]: # type: ignore - remaining_catalog_inputs = set(catalog_view_inputs) - set(iteration_inputs) + remaining_catalog_inputs = set(catalog_inputs) - set(iteration_inputs) if remaining_catalog_inputs: raise CatalogRendererError( - f"There is not enough iteration inputs to render catalog inputs. Catalog inputs are {set(catalog_view_inputs)} and the actual given iteration inputs are {set(iteration_inputs)}. {remaining_catalog_inputs} are remaining" + f"There is not enough iteration inputs to render catalog inputs. Catalog inputs are {set(catalog_inputs)} and the actual given iteration inputs are {set(iteration_inputs)}. {remaining_catalog_inputs} are remaining" ) - remaining_iteration_inputs = set(iteration_inputs) - set(catalog_view_inputs) + remaining_iteration_inputs = set(iteration_inputs) - set(catalog_inputs) if remaining_iteration_inputs: LOGGER.warning( - f"These iteration inputs datasets {remaining_iteration_inputs} are not used in rendering catalog inputs datasets. Catalog inputs are {set(catalog_view_inputs)} and the actual given iteration inputs are {set(iteration_inputs)}." + f"These iteration inputs datasets {remaining_iteration_inputs} are not used in rendering catalog inputs datasets. Catalog inputs are {set(catalog_inputs)} and the actual given iteration inputs are {set(iteration_inputs)}." ) rendered_datasets = {} - for dataset_name, dataset_value in catalog_view_inputs.items(): + for dataset_name, dataset_value in catalog_inputs.items(): LOGGER.info(f"Injecting '{dataset_name}' input into the catalog") rendered_dataset_value = copy.deepcopy(dataset_value) iteration_input_data = iteration_inputs[dataset_name] @@ -166,6 +165,9 @@ def recursively_render_template( {**{"dataset_entry": dataset_attributes}, **template_args} ) return config.dataset_entry + # return Template( + # dataset_attributes, variable_start_string="[[", variable_end_string="]]" + # ).render(template_args) elif isinstance(dataset_attributes, PurePath): config = OmegaConf.create( @@ -205,6 +207,16 @@ def recursively_get_dataset_template_params(datasets: dict) -> List[str]: return template_params +# env = Environment( +# block_start_string="[[%", +# block_end_string="%]]", +# variable_start_string="[[", +# variable_end_string="]]", +# comment_start_string="[[#", +# comment_end_string="#]]", +# ) + + def recursively_get_template_params( dataset_attributes: Union[str, list, dict, PurePath] ) -> List[str]: diff --git a/kedro_boot/framework/session/__init__.py b/kedro_boot/framework/session/__init__.py new file mode 100644 index 0000000..16c3e2c --- /dev/null +++ b/kedro_boot/framework/session/__init__.py @@ -0,0 +1 @@ +from .session import KedroBootSession # noqa: F401 diff --git a/kedro_boot/framework/session/runner.py b/kedro_boot/framework/session/runner.py new file mode 100644 index 0000000..963da3b --- /dev/null +++ b/kedro_boot/framework/session/runner.py @@ -0,0 +1,58 @@ +from typing import Any, Dict, List +from pluggy import PluginManager +from kedro.runner import AbstractRunner, SequentialRunner +from kedro.pipeline import Pipeline +from kedro.io import DataCatalog + + +class KedroBootRunner: + def __init__( + self, + hook_manager: PluginManager, + session_id: str, + runner: AbstractRunner = None, + ) -> None: + if runner and not isinstance(runner, AbstractRunner): + raise KedroBootRunnerError( + f"The runner parameter should be an AbstracRunner, {runner.__class__.__name__} given instead" + ) + + self.runner = runner or SequentialRunner() + + self._session_id = session_id + self._hook_manager = hook_manager + + def run( + self, pipeline: Pipeline, catalog: DataCatalog, outputs_datasets: List[str] + ) -> Dict[str, Any]: + self.runner.run( + pipeline=pipeline, + catalog=catalog, + hook_manager=self._hook_manager, + session_id=self._session_id, + ) + + print(catalog._data_sets) + output_datasets = {} + # if multiple outputs datasets, load the returned datasets indexed by pipeline view outputs + if outputs_datasets and len(outputs_datasets) > 1: + output_datasets = { + dataset_name: catalog.load(dataset_name) + for dataset_name in outputs_datasets + } + elif outputs_datasets and len(outputs_datasets) == 1: + output_datasets = catalog.load(list(outputs_datasets)[0]) + # If no pipeline view outputs, load all the memorydatasets outputs + else: + print(catalog._data_sets) + output_datasets = { + dataset_name: catalog.load(dataset_name) + for dataset_name in pipeline.outputs() + if catalog._data_sets[dataset_name].__class__.__name__.lower() + == "memorydataset" + } + return output_datasets + + +class KedroBootRunnerError(Exception): + """Error raised in case of kedro boot runner error""" diff --git a/kedro_boot/framework/session/session.py b/kedro_boot/framework/session/session.py new file mode 100644 index 0000000..7dbc864 --- /dev/null +++ b/kedro_boot/framework/session/session.py @@ -0,0 +1,127 @@ +"""This module implements Kedro boot session. A user facing interface responsible for orchestring the interaction between kedro and the Apps.""" + +import logging +import uuid +from typing import Any, List, Optional + +from kedro.config import ConfigLoader +from kedro.io import DataCatalog +from pluggy import PluginManager +from kedro.pipeline.pipeline import Pipeline + +from kedro_boot.framework.compiler.specs import CompilationSpec + +from kedro_boot.framework.context import KedroBootContext +from .runner import KedroBootRunner + +LOGGER = logging.getLogger(__name__) + + +class KedroBootSession: + """``KedroBootSession`` A user facing interface that expose kedro's resource to the kedro boot apps. + At init time : It create the ``KedroBootContext`` and a ``KedroBootRunner`` + At compilation time : It compile the catalog through the context. This makes the catalog ready for the run iterations. + At iteration time: It render a new catalog with app data (through the context) and run the pipeline + """ + + def __init__( + self, + pipeline: Pipeline, + catalog: DataCatalog, + hook_manager: PluginManager, + session_id: str, + runtime_app_params: dict, + config_loader: ConfigLoader, + ) -> None: + """Init the kedro boot session. + + Args: + pipeline (Pipeline): ``AppPipeline`` object + catalog (DataCatalog): kedro ``DataCatalog`` object + hook_manager (PluginManager): kedro ``PluginManager`` object + session_id (str): kedro ``KedroSession`` session_id + runtime_app_params (dict): params given by an App specific CLI + config_loader (OmegaConfigLoader): kedro ``OmegaConfigLoader`` object + """ + self._context = KedroBootContext(pipeline=pipeline, catalog=catalog) + self._runner = KedroBootRunner(hook_manager=hook_manager, session_id=session_id) + + self.runtime_app_params = runtime_app_params + self.config_loader = config_loader + + self._is_catalog_compiled = False + + def compile(self, compilation_specs: List[CompilationSpec] = None) -> None: + """Prepare the Catalog for iteration time. The goal is to achieve low latency by minimizing operations needed during the run of an iteration. + A pipeline view provides a perspective on the underlying pipeline, filtered by a particular tag and organized by datasets categories according to their relevance to the external application. + The compilation is triggered automatically by the kedro boot. To give the app the control of the compilation point, set the AbstractKedroBootApp class attribute 'LAZY_COMPILE' to True. If the compilation is neither triggered by the kedro project nor the app, it will be triggered lazily during the first run iteration. + + + Raises: + KedroBootSessionError: _description_ + """ + if self._is_catalog_compiled: + LOGGER.warning("The session is already compiled") + else: + self._context.compile(compilation_specs=compilation_specs) + self._is_catalog_compiled = True + + def run( + self, + namespace: Optional[str] = None, + inputs: Optional[dict] = None, + parameters: Optional[dict] = None, + itertime_params: Optional[dict] = None, + run_id: Optional[str] = None, + ) -> Any: + """Perform a low-latency run of a pipeline's namespace using the provided inputs, parameters and itertime_params. + + Args: + namespace (str): pipeline's namespace. + inputs (dict): App inputs datasets that will be injected into the catalog. + parameters (dict): App parameters datasets that will be injected into the catalog. + itertime_params (dict): App itertime params that will resolve the itertime_params resolvers. + run_id (str): run_id can be generated by the app, otherwise the session generate it at each iteration. + + Raises: + KedroBootSessionError: _description_ + + Returns: + Any: Run results + """ + # pipeline_view_name = name or DEFAULT_PIPELINE_VIEW_NAME + iteration_run_id = run_id or uuid.uuid4().hex + itertime_params = itertime_params or {} + iteration_template_params = {**itertime_params, **{"run_id": iteration_run_id}} + + LOGGER.info(f"Running iteration {iteration_run_id}") + # Coompile catalog lazily at first iteration, if it is not already compiled earlier by the app + if not self._is_catalog_compiled: + LOGGER.warning( + "Lazy Catalog compilation at first iteration run. Beware, since no compilation specs given, the compilation specs are infered from the pipeline." + ) + self.compile() + + pipeline, catalog = self._context.render( + namespace=namespace, + inputs=inputs, + parameters=parameters, + itertime_params=iteration_template_params, + ) + + iteration_outputs = self._runner.run( + pipeline=pipeline, + catalog=catalog, + outputs_datasets=self._context.get_outputs_datasets(namespace), + ) + + LOGGER.info(f"Iteration {iteration_run_id} completed") + + return iteration_outputs + + def get_credentials(self) -> dict: + return self.config_loader["credentials"] + + +class KedroBootSessionError(Exception): + """Error raised in case of kedro boot session error""" diff --git a/kedro_boot/pipeline/__init__.py b/kedro_boot/pipeline/__init__.py deleted file mode 100644 index d5eace6..0000000 --- a/kedro_boot/pipeline/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .factory import app_pipeline, create_dummy_pipeline # noqa: F401 -from .pipeline import DEFAULT_PIPELINE_VIEW_NAME, AppPipeline # noqa: F401 diff --git a/kedro_boot/pipeline/factory.py b/kedro_boot/pipeline/factory.py deleted file mode 100644 index 9bea66a..0000000 --- a/kedro_boot/pipeline/factory.py +++ /dev/null @@ -1,145 +0,0 @@ -""" A factory for creating AppPipeline instances.""" -import copy -import logging -from itertools import chain -from typing import Iterable, List, Optional, Union - -from kedro.pipeline import Pipeline -from kedro.pipeline.modular_pipeline import pipeline -from kedro.pipeline.node import Node, node - -from .pipeline import DEFAULT_PIPELINE_VIEW_NAME, AppPipeline, PipelineView -from .utils import find_duplicates - -LOGGER = logging.getLogger(__name__) - - -def app_pipeline( - pipe: Union[Iterable[Union[Pipeline, AppPipeline]], Pipeline, AppPipeline], - *, - name: Optional[str] = None, - inputs: Optional[Union[str, List[str]]] = None, - outputs: Optional[Union[str, List[str]]] = None, - parameters: Optional[Union[str, List[str]]] = None, - artifacts: Optional[Union[str, List[str]]] = None, - infer_artifacts: Optional[bool] = None, -) -> AppPipeline: - r"""Create a ``AppPipeline`` from a collection of ``Pipeline`` or ``AppPipeline``. - - Args: - pipe: The pipelines the ``AppPipeline`` will be made of. - inputs: A name or collection of input names to be exposed as connection points to kedro boot app. - The kedro boot app will inject data to thoses inputs at each run iteration, refering to them by their names without explicitly including namespace prefix (if any). - Must only refer to the pipeline's free inputs. - outputs: A name or collection of output names to be exposed as connection points to the kedro boot app. - The kedro boot app will load those datasets at each run iteration. - Can refer to both the pipeline's free outputs, as well as intermediate results that need to be exposed. - parameters: A name or collection of parameter names that are exposed to the kedro boot app. - The kedro boot app will inject new parameters into those parameters at each run iteration. - The parameters can be specified without the `params:` prefix. - artifacts: A name or collection of artifact names. - Artifacts are the datasets that will be materialized (loaded as MemoryDataset) at startup time, so they can be reused at each run iteration without the I/O cost. - Must only refer to the pipeline's free inputs. - infer_artifacts: If no artifacts given, should we infer them dynamically. infered artifacts = pipeline free inputs - (view inputs + view parameters + view templates) - - - Raises: - AppPipelineFactoryError: __description__ - - Returns: - A new ``AppPipeline`` object. - """ - - pipelines = copy.copy(pipe) - if not isinstance(pipelines, Iterable): - pipelines = [pipelines] - - views = list( - chain.from_iterable( - [ - pipeline.views - for pipeline in pipelines - if isinstance(pipeline, AppPipeline) - ] # type: ignore - ) - ) - - view_names = [view.name for view in views] - duplicated_views = find_duplicates(view_names) - if duplicated_views: - raise ValueError( - f"Cannot create an AppPipeline with duplicated view names. Those view names '{duplicated_views}' are present in more that one given app pipeline." - ) - - pipeline_view_name = name or DEFAULT_PIPELINE_VIEW_NAME - - formated_inputs = _format_dataset_names(inputs) - formated_outputs = _format_dataset_names(outputs) - formated_parameters = _format_dataset_names(parameters) - formated_artifacts = _format_dataset_names(artifacts) - - formated_infer_artifacts = False if infer_artifacts is None else infer_artifacts - - if artifacts and formated_infer_artifacts: - LOGGER.info("you've gived an artifacts set, infer_artifacts will be ignored") - formated_infer_artifacts = False - - _check_if_tag_exist(pipelines, pipeline_view_name) - modular_pipeline = pipeline( - pipe=pipelines, - inputs=(set(formated_inputs) | set(formated_artifacts)), - outputs=set(formated_outputs), - parameters=set(formated_parameters), - tags=pipeline_view_name, - ) - - views.append( - PipelineView( - name=pipeline_view_name, - inputs=formated_inputs, - outputs=formated_outputs, - parameters=formated_parameters, - artifacts=formated_artifacts, - infer_artifacts=formated_infer_artifacts, - ) - ) - - return AppPipeline([modular_pipeline], views=views) - - -def _check_if_tag_exist(pipelines: Iterable[Union[Node, Pipeline]], tag: str) -> None: - for pipe in pipelines: - if isinstance(pipe, Pipeline): - nodes = pipe.nodes - else: - nodes = [pipe] # pipe is a Node - - for pipe_node in nodes: - if tag in pipe_node.tags: - raise AppPipelineFactoryError( - f"Cannot create an app pipeline with a name '{tag}' as it's already used in the underlying pipeline's nodes. Please consider renaming the app pipeline or removing the tag name from the underlying pipeline's nodes." - ) - - -def _format_dataset_names(ds_name) -> List[str]: - if not ds_name: - return [] - if isinstance(ds_name, str): - return [ds_name] - if isinstance(ds_name, list): - return list(set(ds_name)) - - return list(ds_name) - - -# Can be used when a kedro boot app does not need kedro pipelines. -def create_dummy_pipeline(): - def dummy_func(dummy_input): - pass - - dummy_node = node(dummy_func, "dummy_input", None) - return Pipeline([dummy_node]) - - -class AppPipelineFactoryError(Exception): - """Error raised in AppPipeline factory operations""" diff --git a/kedro_boot/pipeline/pipeline.py b/kedro_boot/pipeline/pipeline.py deleted file mode 100644 index d69b892..0000000 --- a/kedro_boot/pipeline/pipeline.py +++ /dev/null @@ -1,152 +0,0 @@ -""""``AppPipeline`` make the the kedro catalog ready for interaction with the kedro boot app.""" - -from typing import Iterable, List, Optional, Union - -from kedro.pipeline import Pipeline -from kedro.pipeline.node import Node - -from kedro_boot.pipeline.utils import find_duplicates - -DEFAULT_PIPELINE_VIEW_NAME = "__default__" - - -class PipelineView: - """A ``PipelineView`` view define the exposition points to the Kedro Boot application. - It provides a perspective on the underlying pipeline, filtered by a particular tag and organized by dataset categories. - """ - - def __init__( - self, - name: str, - inputs: Optional[List[str]] = None, - outputs: Optional[List[str]] = None, - parameters: Optional[List[str]] = None, - artifacts: Optional[List[str]] = None, - infer_artifacts: Optional[bool] = None, - ) -> None: - """Init ``PipelineView`` with a name and a categorisation of the datasets that are exposed to the app. - - Args: - name (str): A view name. It should be unique per AppPipeline views. - inputs (List[str]): Inputs datasets that will be injected by the app data at iteration time. - outputs (List[str]): Outputs dataset that hold the iteration run results. - parameters (List[str]): Parameters that will be injected by the app data at iteration time. - artifacts (List[str]): Artifacts datasets that will be materialized (loaded as MemoryDataset) at startup time. - infer_artifacts (bool): If no artifacts given, should we infer them dynamically. infered artifacts = pipeline inputs - (inputs + parameters + templates) - """ - self.name = name - self.inputs = inputs or [] - self.outputs = outputs or [] - self.parameters = parameters or [] - self.artifacts = artifacts or [] - self.infer_artifacts = False if infer_artifacts is None else infer_artifacts - - def __str__(self) -> str: - return ( - f"PipelineView(" - f"name={self.name}, " - f"inputs={self.inputs}, " - f"outputs={self.outputs}, " - f"parameters={self.parameters}, " - f"artifacts={self.artifacts}, " - f"infer_artifacts={self.infer_artifacts})" - ) - - -class AppPipeline(Pipeline): - """A kedro Pipeline having a list of views defining the exposition points of the pipeline to the kedro boot app. - - Args: - Pipeline (Pipeline): Kedro Pipeline - """ - - def __init__( - self, nodes: Iterable[Union[Node, Pipeline]], *, views: List[PipelineView] - ) -> None: - """_summary_ - - Args: - nodes (Iterable[Node | Pipeline]): A collection of kedro Nodes or Pipelines - views (List[PipelineView]): A Collection of ``PipelineView`` defining the exposition points of the pipeline for the app. - - Raises: - AppPipelineError: _description_ - """ - - super().__init__(nodes) - - view_names = [view.name for view in views] - - duplicated_views = find_duplicates(view_names) - if duplicated_views: - raise AppPipelineError( - f"Cannot create an AppPipeline with duplicated view names. Those views '{duplicated_views}' are present in more that one app pipeline." - ) - - self.views = views - - def __add__(self, other): - if isinstance(other, AppPipeline): - views = self.views + other.views - view_names = [view.name for view in views] - duplicated_views = find_duplicates(view_names) - if duplicated_views: - raise ValueError( - f"Cannot Add two AppPipeline having duplicated view names. Those view names '{duplicated_views}' are present in both app pipelines." - ) - else: - views = self.views - - nodes = self.nodes + other.nodes - return AppPipeline(nodes, views=views) - - def __sub__(self, other): - raise NotImplementedError() - - def __and__(self, other): - raise NotImplementedError() - - def __or__(self, other): - raise NotImplementedError() - - # Implement filter, as it's used by the kedro session run method - def filter( - self, - tags: Iterable[str] = None, - from_nodes: Iterable[str] = None, - to_nodes: Iterable[str] = None, - node_names: Iterable[str] = None, - from_inputs: Iterable[str] = None, - to_outputs: Iterable[str] = None, - node_namespace: str = None, - ) -> Pipeline: # type: ignore - pipeline = super().filter( - tags, - from_nodes, - to_nodes, - node_names, - from_inputs, - to_outputs, - node_namespace, - ) # type: ignore - return AppPipeline(pipeline.nodes, views=self.views) - - def get_view(self, name: str) -> PipelineView: - view = next( - (view for view in self.views if view.name == name), - None, - ) - - if not view: - raise AppPipelineError( - f"The given view name '{name}' is not present in the app pipeline." - ) - - return view - - def get_physical_pipeline(self, name: str) -> Pipeline: - return self.only_nodes_with_tags(name) - - -class AppPipelineError(Exception): - """Error raised in AppPipeline operations""" diff --git a/kedro_boot/session.py b/kedro_boot/session.py deleted file mode 100644 index 93c396f..0000000 --- a/kedro_boot/session.py +++ /dev/null @@ -1,168 +0,0 @@ -"""This module implements Kedro boot session responsible for kedro boot app lifecycle.""" - -import logging -import uuid -from typing import Any, Optional, Set - -from kedro.config import OmegaConfigLoader -from kedro.io import DataCatalog -from kedro.runner import SequentialRunner -from pluggy import PluginManager - -from kedro_boot.catalog import AppCatalog -from kedro_boot.pipeline import DEFAULT_PIPELINE_VIEW_NAME, AppPipeline - -LOGGER = logging.getLogger(__name__) - - -class KedroBootSession: - """``KedroBootSession`` is the object that is responsible for managing kedro boot app lifecycle. - At init time : It wrap catalog as ``AppCatalog`` - At compilation time : It compile the ``AppCatalog`` making it ready for iteration time. - At iteration time: It render/merge ``AppCatalog`` with app data and run the ``AppPipeline`` - """ - - def __init__( - self, - pipeline: AppPipeline, - catalog: DataCatalog, - hook_manager: PluginManager, - session_id: str, - config_loader: OmegaConfigLoader, - ) -> None: - """Init the kedro boot session. - - Args: - pipeline (Pipeline): ``AppPipeline`` object - catalog (DataCatalog): kedro ``DataCatalog`` object - hook_manager (PluginManager): kedro ``PluginManager`` object - session_id (str): kedro ``KedroSession`` session_id - config_loader (OmegaConfigLoader): kedro ``OmegaConfigLoader`` object - """ - self._pipeline = pipeline - self._catalog = AppCatalog(catalog) - self._hook_manager = hook_manager - self._session_id = session_id - - self._is_catalog_compiled = None - - self.config_loader = config_loader - - def compile_catalog( - self, - ) -> None: - """Prepare ``AppCatalog`` for iteration time by creating catalog views using ``AppPipeline`` views. - A pipeline view provides a perspective on the underlying pipeline, filtered by a particular tag and organized by datasets categories according to their relevance to the external application. - The compilation is triggered automatically by the kedro boot. To give the app the control of the compilation point, start the project by ``kedro boot --lazy-compile``. If the compilation is neither triggered by the kedro project nor the app, it will be triggered lazily during the first run iteration. - - - Raises: - KedroBootSessionError: _description_ - """ - if self._is_catalog_compiled: - raise KedroBootSessionError("The AppCatalog is already compiled") - - self._catalog.compile(self._pipeline) - - self._is_catalog_compiled = True - - def run( - self, - name: Optional[str] = None, - inputs: Optional[dict] = None, - parameters: Optional[dict] = None, - template_params: Optional[dict] = None, - run_id: Optional[str] = None, - ) -> Any: - """run the ``AppPipeline`` view using the provided inputs, parameters and templates. - - Args: - name (str): name of the pipeline view. - inputs (dict): App inputs datasets that will be injected into the catalog. - parameters (dict): App parameters datasets that will be injected into the catalog. - template_params (dict): App templates params that will render the template expressions. - run_id (str): run_id can be generated by the app, otherwise the session generate it at each iteration. - - Raises: - KedroBootSessionError: _description_ - - Returns: - Any: Run results - """ - pipeline_view_name = name or DEFAULT_PIPELINE_VIEW_NAME - iteration_run_id = run_id or uuid.uuid4().hex - template_params = template_params or {} - iteration_template_params = {**template_params, **{"run_id": iteration_run_id}} - - LOGGER.info(f"Running iteration {iteration_run_id}") - # Coompile catalog lazily at first iteration, if it is not already compiled earlier by the app - if not self._is_catalog_compiled: - LOGGER.info("Lazy Catalog compilation at first iteration run ...") - self.compile_catalog() - - pipeline_view = self._pipeline.get_view(pipeline_view_name) - pipeline = self._pipeline.get_physical_pipeline(pipeline_view_name) - catalog = self._catalog.render( - name=pipeline_view_name, - inputs=inputs, - parameters=parameters, - template_params=iteration_template_params, - ) - - # TODO: expose the choice of the runner to the user through app_pipeline - runner = SequentialRunner() - - runner.run( - pipeline=pipeline, - catalog=catalog, - hook_manager=self._hook_manager, - session_id=self._session_id, - ) - - pipeline_outputs = pipeline.outputs() - pipeline_view_outputs = set(pipeline_view.outputs) - iteration_outputs = _get_iteration_outputs( - catalog, pipeline_outputs, pipeline_view_outputs - ) # type: ignore - - LOGGER.info(f"Iteration {iteration_run_id} completed") - - return iteration_outputs - - -def _get_iteration_outputs( - catalog: DataCatalog, pipeline_outputs: Set[str], pipeline_view_outputs: Set[str] -) -> dict: - """helper that load the run iteration outputs from the catalog. - - Args: - catalog (DataCatalog): kedro catalog - pipeline_outputs (set[str]): pipeline free outputs - pipeline_view_outputs (set[str]): outputs exposed to the app - - Returns: - dict: _description_ - """ - - output_datasets = {} - # if multiple outputs datasets, load the returned datasets indexed by pipeline view outputs - if pipeline_view_outputs and len(pipeline_view_outputs) > 1: - output_datasets = { - dataset_name: catalog.load(dataset_name) - for dataset_name in pipeline_view_outputs - } - elif pipeline_view_outputs and len(pipeline_view_outputs) == 1: - output_datasets = catalog.load(list(pipeline_view_outputs)[0]) - # If no pipeline view outputs, load all the memorydatasets outputs - else: - output_datasets = { - dataset_name: catalog.load(dataset_name) - for dataset_name in pipeline_outputs - if catalog._data_sets[dataset_name].__class__.__name__.lower() - == "memorydataset" - } - return output_datasets - - -class KedroBootSessionError(Exception): - """Error raised in case of kedro boot session error""" diff --git a/kedro_boot/pipeline/utils.py b/kedro_boot/utils.py similarity index 100% rename from kedro_boot/pipeline/utils.py rename to kedro_boot/utils.py diff --git a/setup.py b/setup.py index d350724..39cb8c9 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,9 @@ def _parse_requirements(path, encoding="utf-8"): "jupyter>=1.0.0,<2.0.0", ], }, - entry_points={"kedro.project_commands": ["boot = kedro_boot.booter:commands"]}, + entry_points={ + "kedro.project_commands": ["boot = kedro_boot.framework.cli.cli:commands"] + }, keywords="kedro-plugin, framework, data apps, pipelines, machine learning, data pipelines, data science, data engineering, model serving, mlops, dataops", classifiers=[ "Programming Language :: Python :: 3", diff --git a/tests/catalog/test_catalog.py b/tests/catalog/test_catalog.py deleted file mode 100644 index 4286a9a..0000000 --- a/tests/catalog/test_catalog.py +++ /dev/null @@ -1,161 +0,0 @@ -import pytest -from kedro_boot.catalog import AppCatalog -from kedro_boot.catalog.view import CatalogView -from kedro_boot.pipeline.factory import ( - app_pipeline, -) # Replace 'your_module' with the actual module name -from kedro.io import MemoryDataSet -from kedro.pipeline import Pipeline -from kedro.extras.datasets.pandas import CSVDataSet -from pathlib import PurePath -from kedro_boot.catalog.renderer import CatalogRendererError - -# parametrized pipeline views -parametrized_pipeline_views = [ - ( - {"name": "view1", "inputs": ["A"], "outputs": ["C"]}, - { - "name": "view1", - "inputs": {"A": MemoryDataSet(1)}, - "outputs": {"C": MemoryDataSet(3)}, - "parameters": {}, - "artifacts": {}, - "templates": { - "D": CSVDataSet("data/01_raw/test_data_${itertime_param:data_date}.csv") - }, - "unmanaged": {"B": MemoryDataSet(2)}, - }, - ), - ( - {"name": "view2", "inputs": ["A"], "artifacts": "B", "outputs": ["C"]}, - { - "name": "view2", - "inputs": {"A": MemoryDataSet(1)}, - "outputs": {"C": MemoryDataSet(3)}, - "parameters": {}, - "artifacts": {"B": MemoryDataSet(2)}, - "templates": { - "D": CSVDataSet("data/01_raw/test_data_${itertime_param:data_date}.csv") - }, - "unmanaged": {}, - }, - ), -] - - -@pytest.mark.parametrize( - "pipeline_view, expected_catalog_view", parametrized_pipeline_views -) -def test_compile( - mock_pipeline: Pipeline, - mock_app_catalog: AppCatalog, - pipeline_view: dict, - expected_catalog_view, -): - # Create an AppPipeline (you may need to mock this) and call 'compile' - - test_app_pipeline = app_pipeline(mock_pipeline, **pipeline_view) - mock_app_catalog.compile(test_app_pipeline) - - # Assert that catalog_views is not empty - assert mock_app_catalog.catalog_views - assert mock_app_catalog.catalog_views[0].name == expected_catalog_view["name"] - assert ( - mock_app_catalog.catalog_views[0].inputs.keys() - == expected_catalog_view["inputs"].keys() - ) - assert ( - mock_app_catalog.catalog_views[0].outputs.keys() - == expected_catalog_view["outputs"].keys() - ) - assert ( - mock_app_catalog.catalog_views[0].parameters.keys() - == expected_catalog_view["parameters"].keys() - ) - assert ( - mock_app_catalog.catalog_views[0].artifacts.keys() - == expected_catalog_view["artifacts"].keys() - ) - assert ( - mock_app_catalog.catalog_views[0].templates.keys() - == expected_catalog_view["templates"].keys() - ) - assert ( - mock_app_catalog.catalog_views[0].unmanaged.keys() - == expected_catalog_view["unmanaged"].keys() - ) - - -@pytest.mark.parametrize( - "pipeline_view, render_data, expected_rendered_catalog", - [ - ( - {"name": "view1", "inputs": ["A"]}, - { - "name": "view1", - "inputs": {"A": 10}, - "template_params": {"date_param": "01-01-1960"}, - }, - {"A": 10, "D": "data/01_raw/test_data_01-01-1960.csv"}, - ) - ], -) -def test_render_catalog( - mock_pipeline: Pipeline, - mock_app_catalog: AppCatalog, - pipeline_view: dict, - render_data: dict, - expected_rendered_catalog: dict, -): - test_app_pipeline = app_pipeline(mock_pipeline, **pipeline_view) - mock_app_catalog.compile(test_app_pipeline) - - # Call the 'render' method with required parameters - rendered_catalog = mock_app_catalog.render(**render_data) - - assert len(rendered_catalog.list()) == len(test_app_pipeline.data_sets()) - - assert rendered_catalog._data_sets["A"]._data == expected_rendered_catalog["A"] - assert rendered_catalog._data_sets["D"]._filepath == PurePath( - expected_rendered_catalog["D"] - ) - - # Assert that the rendered_catalog is not empty - assert rendered_catalog - - -def test_render_catalog_without_suffisent_inputs( - mock_pipeline: Pipeline, mock_app_catalog: AppCatalog -): - test_app_pipeline = app_pipeline(mock_pipeline, name="view", inputs=["A"]) - mock_app_catalog.compile(test_app_pipeline) - - # Call the 'render' method with required parameters - with pytest.raises(CatalogRendererError): - mock_app_catalog.render(name="view") - - -# Test the 'get_catalog_view' method -def test_get_catalog_view(mock_app_catalog: AppCatalog): - # Create a mock catalog view and add it to app_catalog - mock_catalog_view = CatalogView(name="mock_view") - mock_app_catalog.catalog_views.append(mock_catalog_view) - - # Call 'get_catalog_view' for the added view - retrieved_view = mock_app_catalog.get_catalog_view("mock_view") - - # Assert that the retrieved view is the same as the mock_catalog_view - assert retrieved_view == mock_catalog_view - - -# Test the 'get_view_names' method -def test_get_view_names(mock_app_catalog: AppCatalog): - # Create mock catalog views and add them to app_catalog - mock_app_catalog.catalog_views.append(CatalogView(name="view1")) - mock_app_catalog.catalog_views.append(CatalogView(name="view2")) - - # Call 'get_view_names' and check the returned list - view_names = mock_app_catalog.get_view_names() - - # Assert that the list of view names matches the added views - assert view_names == ["view1", "view2"] diff --git a/tests/conftest.py b/tests/conftest.py index 9b8f0dc..0ed8e50 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,43 +1,41 @@ import pytest -from kedro_boot.catalog import AppCatalog from kedro.pipeline.modular_pipeline import pipeline from kedro.pipeline import Pipeline, node -from kedro.io import MemoryDataSet, DataCatalog -from kedro.extras.datasets.pandas import CSVDataSet @pytest.fixture def mock_pipeline() -> Pipeline: def identity(x, y): - return x + return x * y def square(x): return x**2 def cube(x): - return x**3 + return x**3, {"results": x**3} nodes = [ - node(identity, ["A", "B"], "C", name="identity"), + node(identity, ["A", "params:B"], "C", name="identity"), node(square, "C", "D"), - node(cube, "D", None), + node(cube, "D", ["E", "F"]), ] return pipeline(nodes) -@pytest.fixture -def mock_catalog() -> DataCatalog: - dataset_a = MemoryDataSet(1) - dataset_b = MemoryDataSet(2) - dataset_c = MemoryDataSet(3) - dataset_d = CSVDataSet("data/01_raw/test_data_${oc.select:date_param}.csv") - catalog = DataCatalog( - {"A": dataset_a, "B": dataset_b, "C": dataset_c, "D": dataset_d} - ) - return catalog +# @pytest.fixture +# def mock_catalog() -> DataCatalog: +# dataset_a = MemoryDataSet(1) +# dataset_b = MemoryDataSet(2) +# dataset_c = MemoryDataSet(3) +# dataset_d = MemoryDataSet() +# dataset_e = CSVDataSet("test_data_${oc.select:date_param}.csv") +# catalog = DataCatalog( +# {"A": dataset_a, "B": dataset_b, "C": dataset_c, "D": dataset_d} +# ) +# return catalog -@pytest.fixture -def mock_app_catalog(mock_catalog): - return AppCatalog(mock_catalog) +# @pytest.fixture +# def mock_app_catalog(mock_catalog): +# return AppCatalog(mock_catalog) diff --git a/tests/framework/__init__.py b/tests/framework/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/framework/test_session.py b/tests/framework/test_session.py new file mode 100644 index 0000000..41bdd99 --- /dev/null +++ b/tests/framework/test_session.py @@ -0,0 +1,205 @@ +from typing import List +from kedro_boot.framework.session.session import KedroBootSession +import pytest +from kedro.pipeline import Pipeline +from kedro.pipeline.modular_pipeline import pipeline +from kedro.framework.hooks.manager import _NullPluginManager +from kedro.config import OmegaConfigLoader +from kedro.io import DataCatalog, MemoryDataset +from kedro.extras.datasets.json import JSONDataSet + + +parametrized_test_session_scenarios = [ + ( # Test a non-namespaced pipeline + [{"namespace": None}], + DataCatalog( + { + "A": MemoryDataset(2), + "params:B": MemoryDataset(1), + "C": MemoryDataset(), + "D": MemoryDataset(), + "E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + } + ), + {}, + {"E": 64}, + ), + ( # Test pipeline namespace with all attributes are namespaced + [{"namespace": "n1"}], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "n1.F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + } + ), + {"namespace": "n1", "inputs": {"A": 1}, "parameters": {"B": 1}}, + {"n1.E": 1, "n1.F": {"results": 1}}, + ), + ( # Test pipeline namespace with non-namespaced inputs + [{"namespace": "n1", "inputs": "A"}], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "n1.F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + "A": MemoryDataset(2), + "params:B": MemoryDataset(1), + "C": MemoryDataset(), + "D": MemoryDataset(), + "E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + } + ), + {"namespace": "n1", "parameters": {"B": 1}}, + {"n1.E": 64, "n1.F": {"results": 64}}, + ), + ( # Test pipeline namespace with non-namespaced inputs and outputs + [{"namespace": "n1", "inputs": "A", "outputs": "F"}], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + "A": MemoryDataset(2), + } + ), + {"namespace": "n1", "parameters": {"B": 1}}, + 64, + ), + ( # Test pipeline containing None and not-None namespaces + [{"namespace": None}, {"namespace": "n1"}], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "n1.F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + "A": MemoryDataset(2), + "params:B": MemoryDataset(1), + "C": MemoryDataset(), + "D": MemoryDataset(), + "E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + } + ), + {}, + {"E": 64}, + ), + ( # Test inputs injection + [{"namespace": "n2", "outputs": "F"}, {"namespace": "n1"}], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "n1.F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + "n2.A": MemoryDataset(2), + "params:n2.B": MemoryDataset(1), + "n2.C": MemoryDataset(), + "n2.D": MemoryDataset(), + "n2.E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + } + ), + {"namespace": "n2", "inputs": {"A": 3}, "parameters": {"B": 1}}, + 729, + ), + ( # Test params injection + [{"namespace": "n2", "outputs": "F"}, {"namespace": "n1"}], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "n1.F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + "n2.A": MemoryDataset(2), + "params:n2.B": MemoryDataset(1), + "n2.C": MemoryDataset(), + "n2.D": MemoryDataset(), + "n2.E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + } + ), + {"namespace": "n2", "inputs": {"A": 3}, "parameters": {"B": 2}}, + 46656, + ), + ( # Test branching inputs between namespaces + [ + {"namespace": "n2"}, + {"namespace": "n1", "inputs": {"A": "n2.A"}, "outputs": "F"}, + ], + DataCatalog( + { + "n1.A": MemoryDataset(2), + "params:n1.B": MemoryDataset(1), + "n1.C": MemoryDataset(), + "n1.D": MemoryDataset(), + "n1.E": MemoryDataset(), + "F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + }, + { + "n2.A": MemoryDataset(4), + "params:n2.B": MemoryDataset(1), + "n2.C": MemoryDataset(), + "n2.D": MemoryDataset(), + "n2.E": MemoryDataset(), + "n2.F": JSONDataSet("test_data_${oc.select:date_param,01_01_1960}.csv"), + }, + ), + {"namespace": "n1", "parameters": {"B": 1}}, + 4096, + ), +] + + +@pytest.mark.parametrize( + "namepsaces, catalog, render_data, expected_run_results", + parametrized_test_session_scenarios, +) +def test_session( + mock_pipeline: Pipeline, + namepsaces: List[dict], + catalog: dict, + render_data: dict, + expected_run_results: dict, +): + all_pipelines = pipeline([]) + for namepsace in namepsaces: + all_pipelines = all_pipelines + pipeline(mock_pipeline, **namepsace) + + session = KedroBootSession( + pipeline=all_pipelines, + catalog=catalog, + hook_manager=_NullPluginManager(), + session_id="test1234", + runtime_app_params={}, + config_loader=OmegaConfigLoader(""), + ) + + # session.compile_catalog() + + results = session.run(**render_data) + assert results + assert results == expected_run_results + + +# TODO: Test that cover warning + +# TODO: Tests that covers Exceptions diff --git a/tests/pipeline/test_factory.py b/tests/pipeline/test_factory.py deleted file mode 100644 index fa3a700..0000000 --- a/tests/pipeline/test_factory.py +++ /dev/null @@ -1,192 +0,0 @@ -from kedro_boot.pipeline.factory import ( - app_pipeline, - AppPipelineFactoryError, - create_dummy_pipeline, -) -from kedro.pipeline import Pipeline, node -import pytest - -from kedro_boot.pipeline.pipeline import AppPipeline - - -@pytest.mark.parametrize( - "pipeline_attributes, expected_pipeline_attributes", - [ - ( - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": ["art1"], - "infer_artifacts": True, - }, - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": ["art1"], - "infer_artifacts": False, - }, - ), - ( - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": ["art1"], - "infer_artifacts": False, - }, - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": ["art1"], - "infer_artifacts": False, - }, - ), - ( - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": None, - "infer_artifacts": False, - }, - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": [], - "infer_artifacts": False, - }, - ), - ( - { - "inputs": {"input1"}, - "outputs": {"output2"}, - "parameters": ["param1"], - "artifacts": None, - "infer_artifacts": None, - }, - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": [], - "infer_artifacts": False, - }, - ), - ( - { - "inputs": "input1", - "outputs": "output2", - "parameters": ["param1"], - "artifacts": None, - "infer_artifacts": True, - }, - { - "inputs": ["input1"], - "outputs": ["output2"], - "parameters": ["param1"], - "artifacts": [], - "infer_artifacts": True, - }, - ), - ( - { - "inputs": ["input1"], - "outputs": None, - "parameters": None, - "artifacts": None, - "infer_artifacts": None, - }, - { - "inputs": ["input1"], - "outputs": [], - "parameters": [], - "artifacts": [], - "infer_artifacts": False, - }, - ), - ( - { - "inputs": ["input1", "input2"], - "outputs": None, - "parameters": None, - "artifacts": None, - "infer_artifacts": None, - }, - { - "inputs": ["input1", "input2"], - "outputs": [], - "parameters": [], - "artifacts": [], - "infer_artifacts": False, - }, - ), - ], -) -def test_app_pipeline_with_different_arguments( - pipeline_attributes, expected_pipeline_attributes -): - def test_func1(a, b, c, d): # -> Any: - return a - - def test_func2(a): - return a - - pipeline1 = Pipeline( - [node(test_func1, ["input1", "input2", "art1", "params:param1"], "output1")] - ) - pipeline2 = Pipeline([node(test_func2, "output1", "output2")]) - test_app_pipeline = app_pipeline( - [pipeline1, pipeline2], - name="test", - inputs=pipeline_attributes["inputs"], - outputs=pipeline_attributes["outputs"], - parameters=pipeline_attributes["parameters"], - artifacts=pipeline_attributes["artifacts"], - infer_artifacts=pipeline_attributes["infer_artifacts"], - ) - - assert isinstance(test_app_pipeline, AppPipeline) - assert len(test_app_pipeline.only_nodes_with_tags("test").nodes) == len( - test_app_pipeline.nodes - ) - assert len(test_app_pipeline.views) == 1 - assert test_app_pipeline.views[0].name == "test" - assert set(test_app_pipeline.views[0].inputs) == set( - expected_pipeline_attributes["inputs"] - ) - assert set(test_app_pipeline.views[0].outputs) == set( - expected_pipeline_attributes["outputs"] - ) - assert set(test_app_pipeline.views[0].parameters) == set( - expected_pipeline_attributes["parameters"] - ) - assert set(test_app_pipeline.views[0].artifacts) == set( - expected_pipeline_attributes["artifacts"] - ) - assert ( - test_app_pipeline.views[0].infer_artifacts - == expected_pipeline_attributes["infer_artifacts"] - ) - - -def test_app_pipeline_with_duplicate_view_names(): - pipeline1 = app_pipeline([node(lambda x: x, "input1", "output1")], name="test") - pipeline2 = app_pipeline([node(lambda x: x, "input2", "output2")], name="test") - with pytest.raises(ValueError): - app_pipeline([pipeline1, pipeline2], name="test") - - -def test_app_pipeline_with_tag_already_used_in_pipeline(): - pipeline = Pipeline([node(lambda x: x, "input1", "output1", tags=["test"])]) - with pytest.raises(AppPipelineFactoryError): - app_pipeline(pipeline, name="test") - - -def test_create_dummy_pipeline(): - pipeline = create_dummy_pipeline() - assert isinstance(pipeline, Pipeline) - assert len(pipeline.nodes) == 1 diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py deleted file mode 100644 index 5dee25a..0000000 --- a/tests/pipeline/test_pipeline.py +++ /dev/null @@ -1,63 +0,0 @@ -from kedro.pipeline import Pipeline, node -from kedro_boot.pipeline.pipeline import AppPipeline, PipelineView, AppPipelineError -import pytest - - -def test_app_pipeline(): - def identity(x): - return x - - def square(x): - return x**2 - - def cube(x): - return x**3 - - nodes = [ - node(identity, "A", "B"), - node(square, "B", "C"), - node(cube, "C", "D"), - ] - - view1 = PipelineView(name="view1", inputs=["A"], outputs=["D"]) - view2 = PipelineView(name="view2", inputs=["A"], outputs=["C"]) - view3 = PipelineView(name="view3", inputs=["A"], outputs=["B", "C", "D"]) - - pipeline = AppPipeline(nodes=nodes, views=[view1, view2, view3]) - - assert isinstance(pipeline, Pipeline) - assert isinstance(pipeline, AppPipeline) - assert len(pipeline.views) == 3 - - assert pipeline.get_view("view1").inputs == ["A"] - assert pipeline.get_view("view1").outputs == ["D"] - - assert pipeline.get_view("view2").inputs == ["A"] - assert pipeline.get_view("view2").outputs == ["C"] - - assert pipeline.get_view("view3").inputs == ["A"] - assert pipeline.get_view("view3").outputs == ["B", "C", "D"] - - with pytest.raises(AppPipelineError): - pipeline.get_view("view4") - - pipeline2 = AppPipeline(nodes=nodes[:2], views=[view1, view2]) + AppPipeline( - nodes=nodes[2:], views=[view3] - ) - - assert pipeline2.views == [view1, view2, view3] - assert pipeline2.nodes == nodes - - pipeline3 = AppPipeline(nodes=nodes, views=[view1, view2]) - - with pytest.raises(ValueError): - pipeline + pipeline3 - - with pytest.raises(NotImplementedError): - pipeline - pipeline2 - - with pytest.raises(NotImplementedError): - pipeline & pipeline2 - - with pytest.raises(NotImplementedError): - pipeline | pipeline2 diff --git a/tests/test_session.py b/tests/test_session.py deleted file mode 100644 index 82f03a3..0000000 --- a/tests/test_session.py +++ /dev/null @@ -1,105 +0,0 @@ -from kedro_boot.session import KedroBootSession, KedroBootSessionError -import pytest -from kedro.pipeline import Pipeline -from kedro_boot.pipeline import app_pipeline -from kedro.framework.hooks.manager import _NullPluginManager -from kedro.config import OmegaConfigLoader -from kedro.io import DataCatalog - - -parametrized_test_session_scenarios = [ - ( - {"name": "view1", "inputs": ["A"], "outputs": ["C"]}, - {"name": "view1", "inputs": {"A": 10}}, - 10, - ), - ( - {"name": "view2", "inputs": ["A"]}, - {"name": "view2", "inputs": {"A": 10}}, - {"C": 10}, - ), -] - - -@pytest.mark.parametrize( - "pipeline_view, render_data, expected_results", parametrized_test_session_scenarios -) -def test_session( - mock_pipeline: Pipeline, - mock_catalog: DataCatalog, - pipeline_view: dict, - render_data: dict, - expected_results, -): - test_app_pipeline = app_pipeline( - mock_pipeline.only_nodes("identity"), **pipeline_view - ) - session = KedroBootSession( - pipeline=test_app_pipeline, - catalog=mock_catalog, - hook_manager=_NullPluginManager(), - session_id="", - config_loader=OmegaConfigLoader(""), - ) - - session.compile_catalog() - - results = session.run(**render_data) - - assert results - assert results == expected_results - - -@pytest.mark.parametrize( - "pipeline_view, render_data, expected_results", parametrized_test_session_scenarios -) -def test_session_lazy_compile( - mock_pipeline: Pipeline, - mock_catalog: DataCatalog, - pipeline_view: dict, - render_data: dict, - expected_results, -): - test_app_pipeline = app_pipeline( - mock_pipeline.only_nodes("identity"), **pipeline_view - ) - session = KedroBootSession( - pipeline=test_app_pipeline, - catalog=mock_catalog, - hook_manager=_NullPluginManager(), - session_id="", - config_loader=OmegaConfigLoader(""), - ) - - results = session.run(**render_data) - - assert session._is_catalog_compiled - assert results - assert results == expected_results - - -@pytest.mark.parametrize( - "pipeline_view, render_data, expected_results", parametrized_test_session_scenarios -) -def test_session_multi_compile( - mock_pipeline: Pipeline, - mock_catalog: DataCatalog, - pipeline_view: dict, - render_data: dict, - expected_results, -): - test_app_pipeline = app_pipeline( - mock_pipeline.only_nodes("identity"), **pipeline_view - ) - session = KedroBootSession( - pipeline=test_app_pipeline, - catalog=mock_catalog, - hook_manager=_NullPluginManager(), - session_id="", - config_loader=OmegaConfigLoader(""), - ) - - session.run(**render_data) - - with pytest.raises(KedroBootSessionError): - session.compile_catalog()