Skip to content

Commit

Permalink
feat: kedro-airflow DAG kwarg configuration
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Brugman <[email protected]>
  • Loading branch information
sbrugman committed Jul 14, 2023
1 parent 2cad02f commit 42b15a2
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 76 deletions.
75 changes: 74 additions & 1 deletion kedro-airflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Please visit the guide to [deploy Kedro as a Python package](https://kedro.readt

#### What if my DAG file is in a different directory to my project folder?

By default the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.
By default, the generated DAG file is configured to live in the same directory as your project as per this [template](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/kedro_airflow/airflow_dag_template.j2#L44). If your DAG file is located in a different directory to your project, you will need to tweak this manually after running the `kedro airflow create` command.

#### What if I want to use a different Jinja2 template?

Expand All @@ -56,6 +56,79 @@ You can use the additional command line argument `--jinja-file` (alias `-j`) to
kedro airflow create --jinja-file=./custom/template.j2
```

#### How can I pass arguments to the Airflow DAGs dynamically?

`kedro-airflow` picks up configuration from `airflow.yml` in `conf/base` or `conf/local` of your Kedro project.
Or it could be in a folder starting with `airflow`.
The [parameters](https://docs.kedro.org/en/stable/configuration/parameters.html) are read by Kedro.
Arguments can be specified globally, or per pipeline:

```yaml
# Global parameters
default:
start_date: [2023, 1, 1]
max_active_runs: 3
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval: "@once"
catchup: false
# Default settings applied to all tasks
owner: "airflow"
depends_on_past: false
email_on_failure: false
email_on_retry: false
retries: 1
retry_delay: 5

# Arguments specific to the pipeline (overrides the parameters above)
data_science:
owner: "airflow-ds"
```
Arguments can also be passed via `--params` in the command line:

```bash
kedro airflow create --params "schedule_interval='@weekly'"
```

These variables are passed to the Jinja2 template.

### What if I want to use a configuration pattern other than `airflow*` and `airflow**`?

In order to configure the `OmegaConfigLoader`, update the `settings.py` file in your Kedro project.
For instance, if you would like to use the name `scheduler`, then change the fle as follows:

```python
from kedro.config import OmegaConfigLoader
CONFIG_LOADER_CLASS = OmegaConfigLoader
CONFIG_LOADER_ARGS = {
"config_patterns": {"airflow": ["scheduler*", "scheduler/**"]}
}
```

Follow Kedro's official documentation, to see how to add templating, custom resolvers etc. (https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader)[https://docs.kedro.org/en/stable/configuration/advanced_configuration.html#how-to-do-templating-with-the-omegaconfigloader]

#### What if I want to pass different arguments?

In order to pass arguments other than those specified in the default template, simply pass a custom template (see: _"What if I want to use a different Jinja2 template?"_)

The syntax for arguments is:
```
{{ argument_name }}
```

In order to make arguments optional, one can use:
```
{{ argument_name | default("default_value") }}
```

For examples, please have a look at the default template (`airflow_dag_template.j2`).

#### How can I use Airflow runtime parameters?

It is possible to pass parameters when triggering an Airflow DAG from the user interface.
In order to use this feature, create a custom template using the [Params syntax](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/params.html).
See ["What if I want to use a different Jinja2 template?"](#what-if-i-want-to-use-a-different-jinja2-template) for instructions on using custom templates.

#### What if I want to use a different Airflow Operator?

Which Airflow Operator to use depends on the environment your project is running in.
Expand Down
11 changes: 10 additions & 1 deletion kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Upcoming release 0.5.2
# Upcoming release 0.6.0
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.
* Migrate all project metadata to static `pyproject.toml`.
* Configure DAG kwargs via `airflow.yml`.
* The generated DAG file now contains the pipeline name.
* Included help for CLI arguments (see `kedro airflow create --help`).
* Added additional CLI argument `--params` to pass configuration to the Jinja2 template.

## Community contributions
Many thanks to the following Kedroids for contributing PRs to this release:

* [sbrugman](https://github.com/sbrugman)

# Release 0.5.1
* Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template.
Expand Down
8 changes: 4 additions & 4 deletions kedro-airflow/features/steps/cli_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def install_kedro(context, version):
if version == "latest":
cmd = [context.pip, "install", "-U", "kedro[pandas]"]
else:
cmd = [context.pip, "install", "kedro[pandas]=={}".format(version)]
cmd = [context.pip, "install", f"kedro[pandas]=={version}"]
res = run(cmd, env=context.env)

if res.returncode != OK_EXIT_CODE:
Expand Down Expand Up @@ -121,7 +121,7 @@ def check_message_printed(context, msg):
stdout = context.result.stdout
assert msg in stdout, (
"Expected the following message segment to be printed on stdout: "
"{exp_msg},\nbut got {actual_msg}".format(exp_msg=msg, actual_msg=stdout)
f"{msg},\nbut got {stdout}"
)


Expand Down Expand Up @@ -187,6 +187,6 @@ def check_status_code(context):
if context.result.returncode != OK_EXIT_CODE:
print(context.result.stdout)
print(context.result.stderr)
assert False, "Expected exit code {}" " but got {}".format(
OK_EXIT_CODE, context.result.returncode
raise AssertionError(
f"Expected exit code {OK_EXIT_CODE} but got {context.result.returncode}"
)
61 changes: 29 additions & 32 deletions kedro-airflow/kedro_airflow/airflow_dag_template.j2
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timedelta
from pathlib import Path
from typing import Union

from airflow import DAG
from airflow.models import BaseOperator
Expand All @@ -10,14 +11,13 @@ from kedro.framework.project import configure_project


class KedroOperator(BaseOperator):

@apply_defaults
def __init__(
self,
package_name: str,
pipeline_name: str,
node_name: str,
project_path: str,
project_path: Union[str, Path],
env: str,
*args, **kwargs
) -> None:
Expand All @@ -35,46 +35,43 @@ class KedroOperator(BaseOperator):
env=self.env) as session:
session.run(self.pipeline_name, node_names=[self.node_name])


# Kedro settings required to run your pipeline
env = "{{ env }}"
pipeline_name = "{{ pipeline_name }}"
project_path = Path.cwd()
package_name = "{{ package_name }}"

# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
"{{ dag_name | safe | slugify }}",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30), # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
default_args=default_args,
catchup=False # enable if you don't want historical dag runs to run
) as dag:

tasks = {}
{% for node in pipeline.nodes %}
tasks["{{ node.name | safe | slugify }}"] = KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
dag_id="{{ dag_name | safe | slugify }}",
start_date=datetime({{ start_date | default([2023, 1, 1]) | join(",")}}),
max_active_runs={{ max_active_runs | default(3) }},
# https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
schedule_interval="{{ schedule_interval | default('@once') }}",
catchup={{ catchup | default(False) }},
# Default settings applied to all tasks
default_args=dict(
owner="{{ owner | default('airflow') }}",
depends_on_past={{ depends_on_past | default(False) }},
email_on_failure={{ email_on_failure | default(False) }},
email_on_retry={{ email_on_retry | default(False) }},
retries={{ retries | default(1) }},
retry_delay=timedelta(minutes={{ retry_delay | default(5) }})
)
{% endfor %}
) as dag:
tasks = {
{% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator(
task_id="{{ node.name | safe | slugify }}",
package_name=package_name,
pipeline_name=pipeline_name,
node_name="{{ node.name | safe }}",
project_path=project_path,
env=env,
),
{% endfor %} }

{% for parent_node, child_nodes in dependencies.items() -%}
{% for child in child_nodes %}
tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"]
{% endfor %}
{%- endfor %}
76 changes: 70 additions & 6 deletions kedro-airflow/kedro_airflow/plugin.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
""" Kedro plugin for running a project with Airflow """
from __future__ import annotations

from collections import defaultdict
from pathlib import Path
from typing import Any

import click
import jinja2
from click import secho
from kedro.config import MissingConfigException
from kedro.framework.cli.project import PARAMS_ARG_HELP
from kedro.framework.cli.utils import ENV_HELP, KedroCliError, _split_params
from kedro.framework.context import KedroContext
from kedro.framework.project import pipelines
from kedro.framework.startup import ProjectMetadata
from kedro.framework.session import KedroSession
from kedro.framework.startup import ProjectMetadata, bootstrap_project
from slugify import slugify

PIPELINE_ARG_HELP = """Name of the registered pipeline to convert.
If not set, the '__default__' pipeline is used."""


@click.group(name="Kedro-Airflow")
def commands(): # pylint: disable=missing-function-docstring
Expand All @@ -22,15 +32,44 @@ def airflow_commands():
pass


def _load_config(context: KedroContext, pipeline_name: str) -> dict[str, Any]:
# Set the default pattern for `airflow` if not provided in `settings.py`
if "airflow" not in context.config_loader.config_patterns.keys():
context.config_loader.config_patterns.update( # pragma: no cover
{"airflow": ["airflow*", "airflow/**"]}
)

assert "airflow" in context.config_loader.config_patterns.keys()

# Load the config
try:
config_airflow = context.config_loader["airflow"]
except MissingConfigException:
# File does not exist
return {}

dag_config = {}
# Load the default config if specified
if "default" in config_airflow:
dag_config.update(config_airflow["default"])
# Update with pipeline-specific config if present
if pipeline_name in config_airflow:
dag_config.update(config_airflow[pipeline_name])
return dag_config


@airflow_commands.command()
@click.option("-p", "--pipeline", "pipeline_name", default="__default__")
@click.option("-e", "--env", default="local")
@click.option(
"-p", "--pipeline", "pipeline_name", default="__default__", help=PIPELINE_ARG_HELP
)
@click.option("-e", "--env", default="local", help=ENV_HELP)
@click.option(
"-t",
"--target-dir",
"target_path",
type=click.Path(writable=True, resolve_path=True, file_okay=False),
default="./airflow_dags/",
help="The directory path to store the generated Airflow dags",
)
@click.option(
"-j",
Expand All @@ -39,6 +78,14 @@ def airflow_commands():
exists=True, readable=True, resolve_path=True, file_okay=True, dir_okay=False
),
default=Path(__file__).parent / "airflow_dag_template.j2",
help="The template file for the generated Airflow dags",
)
@click.option(
"--params",
type=click.UNPROCESSED,
default="",
help=PARAMS_ARG_HELP,
callback=_split_params,
)
@click.pass_obj
def create(
Expand All @@ -47,23 +94,39 @@ def create(
env,
target_path,
jinja_file,
params,
): # pylint: disable=too-many-locals,too-many-arguments
"""Create an Airflow DAG for a project"""
project_path = Path.cwd().resolve()
bootstrap_project(project_path)
with KedroSession.create(project_path=project_path, env=env) as session:
context = session.load_context()
dag_config = _load_config(context, pipeline_name)

# Update with params if provided
dag_config.update(params)

jinja_file = Path(jinja_file).resolve()
loader = jinja2.FileSystemLoader(jinja_file.parent)
jinja_env = jinja2.Environment(autoescape=True, loader=loader, lstrip_blocks=True)
jinja_env.filters["slugify"] = slugify
template = jinja_env.get_template(jinja_file.name)

package_name = metadata.package_name
dag_filename = f"{package_name}_dag.py"
dag_filename = (
f"{package_name}_dag.py"
if pipeline_name == "__default__"
else f"{package_name}_{pipeline_name}_dag.py"
)

target_path = Path(target_path)
target_path = target_path / dag_filename

target_path.parent.mkdir(parents=True, exist_ok=True)

pipeline = pipelines.get(pipeline_name)
if pipeline is None:
raise KedroCliError(f"Pipeline {pipeline_name} not found.")

dependencies = defaultdict(list)
for node, parent_nodes in pipeline.node_dependencies.items():
Expand All @@ -77,14 +140,16 @@ def create(
pipeline_name=pipeline_name,
package_name=package_name,
pipeline=pipeline,
**dag_config,
).dump(str(target_path))

secho("")
secho("An Airflow DAG has been generated in:", fg="green")
secho(str(target_path))
secho("This file should be copied to your Airflow DAG folder.", fg="yellow")
secho(
"The Airflow configuration can be customized by editing this file.", fg="green"
"The Airflow configuration can be customized by editing this file.",
fg="green",
)
secho("")
secho(
Expand All @@ -101,4 +166,3 @@ def create(
"And all local paths in both the data catalog and log config must be absolute paths.",
fg="yellow",
)
secho("")
2 changes: 2 additions & 0 deletions kedro-airflow/test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ apache-airflow<3.0
bandit>=1.6.2, <2.0
behave
black~=22.0
cookiecutter>=2.1.1, <3.0
flake8
pre-commit>=1.17.0, <2.0
pylint>=2.5.2, <3.0
pytest
pytest-cov
pytest-mock
pytest-xdist
pyyaml
trufflehog>=2.1.0, <3.0
wheel
Loading

0 comments on commit 42b15a2

Please sign in to comment.