Skip to content

Commit

Permalink
Fix Prefect deployment code (#1775)
Browse files Browse the repository at this point in the history
Signed-off-by: Wim <[email protected]>
  • Loading branch information
wim-qb authored Aug 19, 2022
1 parent c12167d commit f491420
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 48 deletions.
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
## Bug fixes and other changes
* Use default `False` value for rich logging `set_locals`, to make sure credentials and other sensitive data isn't shown in logs.
* Update documentation for `rich` logging.
* Updated Prefect deployment documentation to allow for reruns with saved versioned datasets.
* The Kedro IPython extension now surfaces errors when it cannot load a Kedro project.

## Upcoming deprecations for Kedro 0.19.0
Expand Down
206 changes: 158 additions & 48 deletions docs/source/deployment/prefect.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ In scope of this deployment, we are interested in [Prefect Server](https://docs.
```{note}
Prefect Server ships out-of-the-box with a fully featured user interface.
```

Please note that this deployment has been tested using kedro 0.17.6, 0.17.7 and 0.18.2 with prefect version 1.1.0.
The current implementation has not been tested with prefect 2.0.0.
## Prerequisites

To use Prefect Core and Prefect Server, ensure you have the following prerequisites in place:
Expand All @@ -30,80 +31,59 @@ To build a [Prefect flow](https://docs.prefect.io/core/concepts/flows.html) for
```python
# <project_root>/register_prefect_flow.py
from pathlib import Path
from typing import Any, Dict, List, Tuple, Union

import click

from prefect import Client, Flow, Task
from prefect.exceptions import ClientError

from kedro.framework.hooks.manager import _create_hook_manager
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline.node import Node
from kedro.runner import run_node


class KedroTask(Task):
"""Kedro node as a Prefect task."""

def __init__(self, node: Node, catalog: DataCatalog) -> None:
self._node = node
self._catalog = catalog
super().__init__(name=node.name, tags=node.tags)

def run(self):
run_node(self._node, self._catalog)
from prefect import Client, Flow, Task
from prefect.exceptions import ClientError


@click.command()
@click.option("-p", "--pipeline", "pipeline_name", default=None)
@click.option("--env", "-e", type=str, default=None)
def build_and_register_flow(pipeline_name, env):
@click.option("--package_name", "package_name", default="kedro_prefect")
def prefect_deploy(pipeline_name, env, package_name):
"""Register a Kedro pipeline as a Prefect flow."""

# Project path and metadata required for session initialization task.
project_path = Path.cwd()
metadata = bootstrap_project(project_path)

session = KedroSession.create(project_path=project_path, env=env)
context = session.load_context()

catalog = context.catalog
pipeline_name = pipeline_name or "__default__"
pipeline = pipelines.get(pipeline_name)

unregistered_ds = pipeline.data_sets() - set(catalog.list())
for ds_name in unregistered_ds:
catalog.add(ds_name, MemoryDataSet())

flow = Flow(metadata.project_name)

tasks = {}
for node, parent_nodes in pipeline.node_dependencies.items():
if node._unique_key not in tasks:
node_task = KedroTask(node, catalog)
tasks[node._unique_key] = node_task
else:
node_task = tasks[node._unique_key]
# Use a function for task instantiation which avoids duplication of
# tasks
_, tasks = instantiate_task(node, tasks)

parent_tasks = []

for parent in parent_nodes:
if parent._unique_key not in tasks:
parent_task = KedroTask(parent, catalog)
tasks[parent._unique_key] = parent_task
else:
parent_task = tasks[parent._unique_key]

parent_task, tasks = instantiate_task(parent, tasks)
parent_tasks.append(parent_task)

flow.set_dependencies(task=node_task, upstream_tasks=parent_tasks)
tasks[node._unique_key]["parent_tasks"] = parent_tasks

client = Client()
try:
client.create_project(project_name=metadata.project_name)
except ClientError:
# `metadata.project_name` project already exists
pass
# Below task is used to instantiate a KedroSession within the scope of a
# Prefect flow
init_task = KedroInitTask(
pipeline_name=pipeline_name,
project_path=project_path,
package_name=package_name,
env=env,
)

with Flow(pipeline_name) as flow:
generate_flow(init_task, tasks)
instantiate_client(metadata.project_name)

# Register the flow with the server
flow.register(project_name=metadata.project_name)
Expand All @@ -113,8 +93,138 @@ def build_and_register_flow(pipeline_name, env):
flow.run_agent()


class KedroInitTask(Task):
"""Task to initialize KedroSession"""

def __init__(
self,
pipeline_name: str,
package_name: str,
project_path: Union[Path, str] = None,
env: str = None,
extra_params: Dict[str, Any] = None,
*args,
**kwargs,
):
self.project_path = Path(project_path or Path.cwd()).resolve()
self.extra_params = extra_params
self.pipeline_name = pipeline_name
self.env = env
super().__init__(name=f"{package_name}_init", *args, **kwargs)

def run(self) -> Dict[str, Union[DataCatalog, str]]:
"""
Initializes a Kedro session and returns the DataCatalog and
KedroSession
"""
# bootstrap project within task / flow scope
bootstrap_project(self.project_path)

session = KedroSession.create(
project_path=self.project_path,
env=self.env,
extra_params=self.extra_params, # noqa: E501
)
# Note that for logging inside a Prefect task self.logger is used.
self.logger.info("Session created with ID %s", session.session_id)
pipeline = pipelines.get(self.pipeline_name)
context = session.load_context()
catalog = context.catalog
unregistered_ds = pipeline.data_sets() - set(catalog.list()) # NOQA
for ds_name in unregistered_ds:
catalog.add(ds_name, MemoryDataSet())
return {"catalog": catalog, "sess_id": session.session_id}


class KedroTask(Task):
"""Kedro node as a Prefect task."""

def __init__(self, node: Node):
self._node = node
super().__init__(name=node.name, tags=node.tags)

def run(self, task_dict: Dict[str, Union[DataCatalog, str]]):
run_node(
self._node,
task_dict["catalog"],
_create_hook_manager(),
task_dict["sess_id"],
)


def instantiate_task(
node: Node,
tasks: Dict[str, Dict[str, Union[KedroTask, List[KedroTask]]]],
) -> Tuple[KedroTask, Dict[str, Dict[str, Union[KedroTask, List[KedroTask]]]]]:
"""
Function pulls node task from <tasks> dictionary. If node task not
available in <tasks> the function instantiates the tasks and adds
it to <tasks>. In this way we avoid duplicate instantiations of
the same node task.
Args:
node: Kedro node for which a Prefect task is being created.
tasks: dictionary mapping node names to a dictionary containing
node tasks and parent node tasks.
Returns: Prefect task for the passed node and task dictionary.
"""
if tasks.get(node._unique_key) is not None:
node_task = tasks[node._unique_key]["task"]
else:
node_task = KedroTask(node)
tasks[node._unique_key] = {"task": node_task}

# return tasks as it is mutated. We want to make this obvious to the user.
return node_task, tasks # type: ignore[return-value]


def generate_flow(
init_task: KedroInitTask,
tasks: Dict[str, Dict[str, Union[KedroTask, List[KedroTask]]]],
):
"""
Constructs a Prefect flow given a task dictionary. Task dictionary
maps Kedro node names to a dictionary containing a node task and its
parents.
Args:
init_task: Prefect initialisation tasks. Used to instantiate a Kedro
session within the scope of a Prefect flow.
tasks: dictionary mapping Kedro node names to a dictionary
containing a corresponding node task and its parents.
Returns: None
"""
child_task_dict = init_task
for task in tasks.values():
node_task = task["task"]
if len(task["parent_tasks"]) == 0:
# When a task has no parent only the session init task should
# precede it.
parent_tasks = [init_task]
else:
parent_tasks = task["parent_tasks"]
# Set upstream tasks and bind required kwargs.
# Note: Unpacking the return from init tasks will generate two
# sub-tasks in the prefect graph. To avoid this we pass the init
# return on unpacked.
node_task.bind(upstream_tasks=parent_tasks, task_dict=child_task_dict)


def instantiate_client(project_name: str):
"""Initiates Prefect client"""
client = Client()
try:
client.create_project(project_name=project_name)
except ClientError:
raise


if __name__ == "__main__":
build_and_register_flow()
prefect_deploy()
```
```
```{note}
Expand Down

0 comments on commit f491420

Please sign in to comment.