From 926679336111dd4954e930548a75f054bf5ce590 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Fri, 16 Jun 2023 14:39:05 +0200 Subject: [PATCH 01/15] feat: option to group in-memory nodes Signed-off-by: Simon Brugman --- kedro-airflow/README.md | 13 ++ kedro-airflow/RELEASE.md | 1 + .../kedro_airflow/airflow_dag_template.j2 | 19 +-- kedro-airflow/kedro_airflow/grouping.py | 90 ++++++++++++ kedro-airflow/kedro_airflow/plugin.py | 26 +++- kedro-airflow/tests/test_node_grouping.py | 128 ++++++++++++++++++ kedro-airflow/tests/test_plugin.py | 2 + 7 files changed, 266 insertions(+), 13 deletions(-) create mode 100644 kedro-airflow/kedro_airflow/grouping.py create mode 100644 kedro-airflow/tests/test_node_grouping.py diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index 4e1286005..eb31d6c73 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -154,6 +154,19 @@ See ["What if I want to use a different Jinja2 template?"](#what-if-i-want-to-us The [rich offering](https://airflow.apache.org/docs/apache-airflow-providers/operators-and-hooks-ref/index.html) of operators means that the `kedro-airflow` plugin is providing templates for specific operators. The default template provided by `kedro-airflow` uses the `BaseOperator`. +### Can I group nodes together? + +When running Kedro nodes using Airflow, MemoryDataSets are often not shared across operators. +This will cause the DAG run to fail. + +MemoryDataSets may be used to provide logical separation between nodes in Kedro, without the overhead of needing to write to disk (and in the case of distributed running needing multiple executors). + +Nodes that are connected through MemoryDataSets are grouped together via the `--group-in-memory` flag. +This preserves the option to have logical separation in Kedro, with little computational overhead. + +It is possible to use [task groups](https://docs.astronomer.io/learn/task-groups) by changing the template. +See ["What if I want to use a different Jinja2 template?"](#what-if-i-want-to-use-a-different-jinja2-template) for instructions on using custom templates. + ## Can I contribute? Yes! Want to help build Kedro-Airflow? Check out our guide to [contributing](https://github.com/kedro-org/kedro-plugins/blob/main/kedro-airflow/CONTRIBUTING.md). diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index b4d2eb7d0..1c94a7179 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,4 +1,5 @@ # Upcoming Release +* Option to group MemoryDataSets in the same Airflow task (breaking change for custom template via `--jinja-file`). # Release 0.8.0 * Added support for Kedro 0.19.x diff --git a/kedro-airflow/kedro_airflow/airflow_dag_template.j2 b/kedro-airflow/kedro_airflow/airflow_dag_template.j2 index b961420d5..d1d8b8237 100644 --- a/kedro-airflow/kedro_airflow/airflow_dag_template.j2 +++ b/kedro-airflow/kedro_airflow/airflow_dag_template.j2 @@ -1,4 +1,5 @@ from __future__ import annotations + from datetime import datetime, timedelta from pathlib import Path @@ -16,7 +17,7 @@ class KedroOperator(BaseOperator): self, package_name: str, pipeline_name: str, - node_name: str, + node_name: str | list[str], project_path: str | Path, env: str, *args, **kwargs @@ -30,10 +31,10 @@ class KedroOperator(BaseOperator): def execute(self, context): configure_project(self.package_name) - with KedroSession.create(project_path=self.project_path, - env=self.env) as session: - session.run(self.pipeline_name, node_names=[self.node_name]) - + with KedroSession.create(self.project_path, env=self.env) as session: + if isinstance(self.node_name, str): + self.node_name = [self.node_name] + session.run(self.pipeline_name, node_names=self.node_name) # Kedro settings required to run your pipeline env = "{{ env }}" @@ -60,17 +61,17 @@ with DAG( ) ) as dag: tasks = { - {% for node in pipeline.nodes %} "{{ node.name | safe | slugify }}": KedroOperator( - task_id="{{ node.name | safe | slugify }}", + {% for node_name, node_list in nodes.items() %} "{{ node_name | safe | slugify }}": KedroOperator( + task_id="{{ node_name | safe | slugify }}", package_name=package_name, pipeline_name=pipeline_name, - node_name="{{ node.name | safe }}", + node_name={% if node_list | length > 1 %}[{% endif %}{% for node in node_list %}"{{ node.name | safe | slugify }}"{% if not loop.last %}, {% endif %}{% endfor %}{% if node_list | length > 1 %}]{% endif %}, project_path=project_path, env=env, ), {% endfor %} } {% for parent_node, child_nodes in dependencies.items() -%} - {% for child in child_nodes %} tasks["{{ parent_node.name | safe | slugify }}"] >> tasks["{{ child.name | safe | slugify }}"] + {% for child in child_nodes %} tasks["{{ parent_node | safe | slugify }}"] >> tasks["{{ child | safe | slugify }}"] {% endfor %} {%- endfor %} diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py new file mode 100644 index 000000000..c30ccca13 --- /dev/null +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from collections import defaultdict + +from kedro.io import DataCatalog, MemoryDataSet +from kedro.pipeline.node import Node +from kedro.pipeline.pipeline import Pipeline + + +def _is_memory_dataset(catalog, dataset_name: str) -> bool: + if dataset_name == "parameters" or dataset_name.startswith("params:"): + return False + + dataset = catalog._data_sets.get(dataset_name, None) + return dataset is not None and isinstance(dataset, MemoryDataSet) + + +def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]: + """Gather all datasets in the pipeline that are of type MemoryDataSet, excluding 'parameters'.""" + return { + dataset_name + for dataset_name in pipeline.data_sets() + if _is_memory_dataset(catalog, dataset_name) + } + + +def node_sequence_name(node_sequence: list[Node]) -> str: + return "_".join([node.name for node in node_sequence]) + + +def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): + # get all memory datasets in the pipeline + ds = get_memory_datasets(catalog, pipeline) + + # Node sequences + node_sequences = [] + + # Mapping from dataset name -> node sequence index + sequence_map = {} + for node in pipeline.nodes: + if all(o not in ds for o in node.inputs + node.outputs): + # standalone node + node_sequences.append([node]) + else: + if all(i not in ds for i in node.inputs): + # start of a sequence; create a new sequence and store the id + node_sequences.append([node]) + sequence_id = len(node_sequences) - 1 + else: + # continuation of a sequence; retrieve sequence_id + sequence_id = None + for i in node.inputs: + if i in ds: + assert sequence_id is None or sequence_id == sequence_map[i] + sequence_id = sequence_map[i] + + # Append to map + node_sequences[sequence_id].append(node) + + # map outputs to sequence_id + for o in node.outputs: + if o in ds: + sequence_map[o] = sequence_id + + # Named node sequences + nodes = { + node_sequence_name(node_sequence): node_sequence + for node_sequence in node_sequences + } + + # Inverted mapping + node_mapping = { + node.name: sequence_name + for sequence_name, node_sequence in nodes.items() + for node in node_sequence + } + + # Grouped dependencies + dependencies = defaultdict(list) + for node, parent_nodes in pipeline.node_dependencies.items(): + for parent in parent_nodes: + parent_name = node_mapping[parent.name] + node_name = node_mapping[node.name] + if parent_name != node_name and ( + parent_name not in dependencies + or node_name not in dependencies[parent_name] + ): + dependencies[parent_name].append(node_name) + + return nodes, dependencies diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index 70f9e41c2..a5c0bf706 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -17,6 +17,8 @@ from kedro.framework.startup import ProjectMetadata, bootstrap_project from slugify import slugify +from kedro_airflow.grouping import group_memory_nodes + PIPELINE_ARG_HELP = """Name of the registered pipeline to convert. If not set, the '__default__' pipeline is used. This argument supports passing multiple values using `--pipeline [p1] --pipeline [p2]`. @@ -100,6 +102,14 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str) default=Path(__file__).parent / "airflow_dag_template.j2", help="The template file for the generated Airflow dags", ) +@click.option( + "-g", + "--group-in-memory", + is_flag=True, + default=False, + help="Group nodes with at least one MemoryDataSet as input/output together, " + "as they do not persist between Airflow operators.", +) @click.option( "--params", type=click.UNPROCESSED, @@ -114,6 +124,7 @@ def create( # noqa: PLR0913 env, target_path, jinja_file, + group_in_memory, params, convert_all: bool, ): @@ -165,13 +176,20 @@ def create( # noqa: PLR0913 else f"{package_name}_{name}_dag.py" ) - dependencies = defaultdict(list) - for node, parent_nodes in pipeline.node_dependencies.items(): - for parent in parent_nodes: - dependencies[parent].append(node) + # group memory nodes + if group_in_memory: + nodes, dependencies = group_memory_nodes(context.catalog, pipeline) + else: + nodes = {node.name: [node] for node in pipeline.nodes} + + dependencies = defaultdict(list) + for node, parent_nodes in pipeline.node_dependencies.items(): + for parent in parent_nodes: + dependencies[parent.name].append(node.name) template.stream( dag_name=package_name, + nodes=nodes, dependencies=dependencies, env=env, pipeline_name=name, diff --git a/kedro-airflow/tests/test_node_grouping.py b/kedro-airflow/tests/test_node_grouping.py new file mode 100644 index 000000000..63b15d64f --- /dev/null +++ b/kedro-airflow/tests/test_node_grouping.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from typing import Any + +import pytest +from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline + +from kedro_airflow.grouping import _is_memory_dataset, group_memory_nodes + + +class TestDataSet(AbstractDataSet): + def _save(self, data) -> None: + pass + + def _describe(self) -> dict[str, Any]: + return {} + + def _load(self): + return [] + + +@pytest.mark.parametrize( + "memory_nodes,expected_nodes,expected_dependencies", + [ + ( + ["ds3", "ds6"], + [["f1"], ["f2", "f3", "f4", "f6", "f7"], ["f5"]], + {"f1": ["f2_f3_f4_f6_f7"], "f2_f3_f4_f6_f7": ["f5"]}, + ), + ( + ["ds3"], + [["f1"], ["f2", "f3", "f4", "f7"], ["f5"], ["f6"]], + {"f1": ["f2_f3_f4_f7"], "f2_f3_f4_f7": ["f5", "f6"]}, + ), + ( + [], + [["f1"], ["f2"], ["f3"], ["f4"], ["f5"], ["f6"], ["f7"]], + {"f1": ["f2"], "f2": ["f3", "f4", "f5", "f7"], "f4": ["f6", "f7"]}, + ), + ], +) +def test_group_memory_nodes( + memory_nodes: list[str], + expected_nodes: list[list[str]], + expected_dependencies: dict[str, list[str]], +): + """Check the grouping of memory nodes.""" + nodes = [f"ds{i}" for i in range(1, 10)] + assert all(node_name in nodes for node_name in memory_nodes) + + mock_catalog = DataCatalog() + for dataset_name in nodes: + if dataset_name in memory_nodes: + dataset = MemoryDataSet() + else: + dataset = TestDataSet() + mock_catalog.add(dataset_name, dataset) + + def identity_one_to_one(x): + return x + + mock_pipeline = modular_pipeline( + [ + node( + func=identity_one_to_one, + inputs="ds1", + outputs="ds2", + name="f1", + ), + node( + func=lambda x: (x, x), + inputs="ds2", + outputs=["ds3", "ds4"], + name="f2", + ), + node( + func=identity_one_to_one, + inputs="ds3", + outputs="ds5", + name="f3", + ), + node( + func=identity_one_to_one, + inputs="ds3", + outputs="ds6", + name="f4", + ), + node( + func=identity_one_to_one, + inputs="ds4", + outputs="ds8", + name="f5", + ), + node( + func=identity_one_to_one, + inputs="ds6", + outputs="ds7", + name="f6", + ), + node( + func=lambda x, y: x, + inputs=["ds3", "ds6"], + outputs="ds9", + name="f7", + ), + ], + ) + + nodes, dependencies = group_memory_nodes(mock_catalog, mock_pipeline) + sequence = [ + [node_.name for node_ in node_sequence] for node_sequence in nodes.values() + ] + assert sequence == expected_nodes + assert dict(dependencies) == expected_dependencies + + +def test_is_memory_dataset(): + catalog = DataCatalog() + catalog.add("parameters", {"hello": "world"}) + catalog.add("params:hello", "world") + catalog.add("my_dataset", MemoryDataSet(True)) + catalog.add("test_dataset", TestDataSet()) + assert not _is_memory_dataset(catalog, "parameters") + assert not _is_memory_dataset(catalog, "params:hello") + assert _is_memory_dataset(catalog, "my_dataset") + assert not _is_memory_dataset(catalog, "test_dataset") diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 40899e693..0969a601b 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -16,6 +16,8 @@ ("hello_world", "__default__", ["airflow", "create"]), # Test execution with alternate pipeline name ("hello_world", "ds", ["airflow", "create", "--pipeline", "ds"]), + # Test with grouping + ("hello_world", "__default__", ["airflow", "create", "--group-in-memory"]), ], ) def test_create_airflow_dag(dag_name, pipeline_name, command, cli_runner, metadata): From 1af3f25dac5bde767d082b81f1d646a181bbebcf Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Wed, 20 Dec 2023 22:19:42 +0100 Subject: [PATCH 02/15] fix: MemoryDataset Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/grouping.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index c30ccca13..7e8287ed3 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -2,7 +2,7 @@ from collections import defaultdict -from kedro.io import DataCatalog, MemoryDataSet +from kedro.io import DataCatalog, MemoryDataset from kedro.pipeline.node import Node from kedro.pipeline.pipeline import Pipeline @@ -12,7 +12,7 @@ def _is_memory_dataset(catalog, dataset_name: str) -> bool: return False dataset = catalog._data_sets.get(dataset_name, None) - return dataset is not None and isinstance(dataset, MemoryDataSet) + return dataset is not None and isinstance(dataset, MemoryDataset) def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]: From 8046f3d6ae54cf96d7ca18ab701bdebba1c03b90 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:11:50 +0100 Subject: [PATCH 03/15] Update kedro-airflow/README.md Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index eb31d6c73..7db907f90 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -156,7 +156,7 @@ The default template provided by `kedro-airflow` uses the `BaseOperator`. ### Can I group nodes together? -When running Kedro nodes using Airflow, MemoryDataSets are often not shared across operators. +When running Kedro nodes using Airflow, MemoryDatasets are often not shared across operators. This will cause the DAG run to fail. MemoryDataSets may be used to provide logical separation between nodes in Kedro, without the overhead of needing to write to disk (and in the case of distributed running needing multiple executors). From 334462e1a3b66c84ebb98c66063e0d418250810a Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:11:58 +0100 Subject: [PATCH 04/15] Update kedro-airflow/README.md Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index 7db907f90..8b6e37ee1 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -159,7 +159,7 @@ The default template provided by `kedro-airflow` uses the `BaseOperator`. When running Kedro nodes using Airflow, MemoryDatasets are often not shared across operators. This will cause the DAG run to fail. -MemoryDataSets may be used to provide logical separation between nodes in Kedro, without the overhead of needing to write to disk (and in the case of distributed running needing multiple executors). +MemoryDatasets may be used to provide logical separation between nodes in Kedro, without the overhead of needing to write to disk (and in the case of distributed running needing multiple executors). Nodes that are connected through MemoryDataSets are grouped together via the `--group-in-memory` flag. This preserves the option to have logical separation in Kedro, with little computational overhead. From dc15e0259dc2e07d4f9717daa2a7a4b2382a5e47 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:12:09 +0100 Subject: [PATCH 05/15] Update kedro-airflow/README.md Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/README.md b/kedro-airflow/README.md index 8b6e37ee1..cf405aa7a 100644 --- a/kedro-airflow/README.md +++ b/kedro-airflow/README.md @@ -161,7 +161,7 @@ This will cause the DAG run to fail. MemoryDatasets may be used to provide logical separation between nodes in Kedro, without the overhead of needing to write to disk (and in the case of distributed running needing multiple executors). -Nodes that are connected through MemoryDataSets are grouped together via the `--group-in-memory` flag. +Nodes that are connected through MemoryDatasets are grouped together via the `--group-in-memory` flag. This preserves the option to have logical separation in Kedro, with little computational overhead. It is possible to use [task groups](https://docs.astronomer.io/learn/task-groups) by changing the template. From 48282887d9edea8d8f255c1d9541c170263d8193 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:12:18 +0100 Subject: [PATCH 06/15] Update kedro-airflow/RELEASE.md Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/RELEASE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index 1c94a7179..fe8a79668 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,5 +1,5 @@ # Upcoming Release -* Option to group MemoryDataSets in the same Airflow task (breaking change for custom template via `--jinja-file`). +* Option to group MemoryDatasets in the same Airflow task (breaking change for custom template via `--jinja-file`). # Release 0.8.0 * Added support for Kedro 0.19.x From d57370a0afad15b9715d187d55999e5c49c36d73 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:12:25 +0100 Subject: [PATCH 07/15] Update kedro-airflow/kedro_airflow/grouping.py Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/grouping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index 7e8287ed3..8323e60e8 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -16,7 +16,7 @@ def _is_memory_dataset(catalog, dataset_name: str) -> bool: def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]: - """Gather all datasets in the pipeline that are of type MemoryDataSet, excluding 'parameters'.""" + """Gather all datasets in the pipeline that are of type MemoryDataset, excluding 'parameters'.""" return { dataset_name for dataset_name in pipeline.data_sets() From 4850140e78cc353c0fe56f234a8f986eef7baa05 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:12:34 +0100 Subject: [PATCH 08/15] Update kedro-airflow/kedro_airflow/plugin.py Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/kedro_airflow/plugin.py b/kedro-airflow/kedro_airflow/plugin.py index a5c0bf706..1a164d4d1 100644 --- a/kedro-airflow/kedro_airflow/plugin.py +++ b/kedro-airflow/kedro_airflow/plugin.py @@ -107,7 +107,7 @@ def _get_pipeline_config(config_airflow: dict, params: dict, pipeline_name: str) "--group-in-memory", is_flag=True, default=False, - help="Group nodes with at least one MemoryDataSet as input/output together, " + help="Group nodes with at least one MemoryDataset as input/output together, " "as they do not persist between Airflow operators.", ) @click.option( From be5178b7e97ab432238b3a2b3a4a69ee211e74da Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:12:42 +0100 Subject: [PATCH 09/15] Update kedro-airflow/tests/test_node_grouping.py Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/tests/test_node_grouping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/tests/test_node_grouping.py b/kedro-airflow/tests/test_node_grouping.py index 63b15d64f..334f8ea66 100644 --- a/kedro-airflow/tests/test_node_grouping.py +++ b/kedro-airflow/tests/test_node_grouping.py @@ -10,7 +10,7 @@ from kedro_airflow.grouping import _is_memory_dataset, group_memory_nodes -class TestDataSet(AbstractDataSet): +class TestDataset(AbstractDataset): def _save(self, data) -> None: pass From 1311a967e18707cf220365d54f1a8b6f983e6d3f Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Tue, 9 Jan 2024 14:12:49 +0100 Subject: [PATCH 10/15] Update kedro-airflow/tests/test_node_grouping.py Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/tests/test_node_grouping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/tests/test_node_grouping.py b/kedro-airflow/tests/test_node_grouping.py index 334f8ea66..46d04806c 100644 --- a/kedro-airflow/tests/test_node_grouping.py +++ b/kedro-airflow/tests/test_node_grouping.py @@ -3,7 +3,7 @@ from typing import Any import pytest -from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet +from kedro.io import AbstractDataset, DataCatalog, MemoryDataset from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline From aaf62d422cd2873bf82598856f2634fa0310c8d2 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 29 Jan 2024 12:58:47 +0100 Subject: [PATCH 11/15] Update kedro-airflow/kedro_airflow/grouping.py Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/grouping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index 8323e60e8..77eee4728 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -30,7 +30,7 @@ def node_sequence_name(node_sequence: list[Node]) -> str: def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # get all memory datasets in the pipeline - ds = get_memory_datasets(catalog, pipeline) + memory_datasets = get_memory_datasets(catalog, pipeline) # Node sequences node_sequences = [] From c5ed1e01683031e4c184cf26d029940d9107087a Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 29 Jan 2024 12:58:59 +0100 Subject: [PATCH 12/15] Update kedro-airflow/kedro_airflow/grouping.py Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/grouping.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index 77eee4728..5816fa0ed 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -11,7 +11,7 @@ def _is_memory_dataset(catalog, dataset_name: str) -> bool: if dataset_name == "parameters" or dataset_name.startswith("params:"): return False - dataset = catalog._data_sets.get(dataset_name, None) + dataset = catalog._datasets.get(dataset_name, None) return dataset is not None and isinstance(dataset, MemoryDataset) From 473c96f1de7bce09b488ec43923ae1a69fb820ca Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 29 Jan 2024 13:08:57 +0100 Subject: [PATCH 13/15] fix: tests Signed-off-by: Simon Brugman --- kedro-airflow/tests/test_node_grouping.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kedro-airflow/tests/test_node_grouping.py b/kedro-airflow/tests/test_node_grouping.py index 46d04806c..97b59824c 100644 --- a/kedro-airflow/tests/test_node_grouping.py +++ b/kedro-airflow/tests/test_node_grouping.py @@ -53,9 +53,9 @@ def test_group_memory_nodes( mock_catalog = DataCatalog() for dataset_name in nodes: if dataset_name in memory_nodes: - dataset = MemoryDataSet() + dataset = MemoryDataset() else: - dataset = TestDataSet() + dataset = TestDataset() mock_catalog.add(dataset_name, dataset) def identity_one_to_one(x): @@ -120,8 +120,8 @@ def test_is_memory_dataset(): catalog = DataCatalog() catalog.add("parameters", {"hello": "world"}) catalog.add("params:hello", "world") - catalog.add("my_dataset", MemoryDataSet(True)) - catalog.add("test_dataset", TestDataSet()) + catalog.add("my_dataset", MemoryDataset(True)) + catalog.add("test_dataset", TestDataset()) assert not _is_memory_dataset(catalog, "parameters") assert not _is_memory_dataset(catalog, "params:hello") assert _is_memory_dataset(catalog, "my_dataset") From 7b5ce2e1ab905b4ffedc66a5170fe64366539fb6 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 29 Jan 2024 13:14:53 +0100 Subject: [PATCH 14/15] Bump minimum kedro version Signed-off-by: Simon Brugman --- kedro-airflow/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-airflow/pyproject.toml b/kedro-airflow/pyproject.toml index 309252162..2efd3611e 100644 --- a/kedro-airflow/pyproject.toml +++ b/kedro-airflow/pyproject.toml @@ -11,7 +11,7 @@ description = "Kedro-Airflow makes it easy to deploy Kedro projects to Airflow" requires-python = ">=3.8" license = {text = "Apache Software License (Apache 2.0)"} dependencies = [ - "kedro>=0.17.5", + "kedro>=0.19.0", "python-slugify>=4.0", "semver>=2.10", # Needs to be at least 2.10.0 to make use of `VersionInfo.match`. ] From d20d52f4ac8bcfe2930d697c001847b242d42686 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Mon, 29 Jan 2024 13:26:40 +0100 Subject: [PATCH 15/15] fixes Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/grouping.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index 5816fa0ed..581c84f41 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -19,7 +19,7 @@ def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]: """Gather all datasets in the pipeline that are of type MemoryDataset, excluding 'parameters'.""" return { dataset_name - for dataset_name in pipeline.data_sets() + for dataset_name in pipeline.datasets() if _is_memory_dataset(catalog, dataset_name) } @@ -29,6 +29,13 @@ def node_sequence_name(node_sequence: list[Node]) -> str: def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): + """ + Nodes that are connected through MemoryDatasets cannot be distributed across + multiple machines, e.g. be in different Kubernetes pods. This function + groups nodes that are connected through MemoryDatasets in the pipeline + together. Essentially, this computes connected components over the graph of + nodes connected by MemoryDatasets. + """ # get all memory datasets in the pipeline memory_datasets = get_memory_datasets(catalog, pipeline) @@ -38,11 +45,11 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # Mapping from dataset name -> node sequence index sequence_map = {} for node in pipeline.nodes: - if all(o not in ds for o in node.inputs + node.outputs): + if all(o not in memory_datasets for o in node.inputs + node.outputs): # standalone node node_sequences.append([node]) else: - if all(i not in ds for i in node.inputs): + if all(i not in memory_datasets for i in node.inputs): # start of a sequence; create a new sequence and store the id node_sequences.append([node]) sequence_id = len(node_sequences) - 1 @@ -50,7 +57,7 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # continuation of a sequence; retrieve sequence_id sequence_id = None for i in node.inputs: - if i in ds: + if i in memory_datasets: assert sequence_id is None or sequence_id == sequence_map[i] sequence_id = sequence_map[i] @@ -59,7 +66,7 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # map outputs to sequence_id for o in node.outputs: - if o in ds: + if o in memory_datasets: sequence_map[o] = sequence_id # Named node sequences