From 426824adb4ebe8a2d6b1df9a5539ef9cbafc2cac Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 10 Aug 2022 14:56:35 +0100 Subject: [PATCH 01/34] Add _find_first_persistent_ancestors and stubs for supporting functions. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 79ffd10d48..700d7feac6 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -183,6 +183,34 @@ def _suggest_resume_scenario( postfix, ) + def _find_first_persistent_ancestors( + self, pipeline: Pipeline, start: Node + ) -> Iterable[Node]: + """ + Depth-first search approach to finding the first ancestors + """ + stack, result = [start], set() + while stack: + current_node = stack.pop() + if self._output_is_persistent(current_node): + result.add(current_node) + continue + for parent in self._enumerate_parents(current_node, pipeline): + stack.append(parent) + + def _enumerate_parents( + self, pipeline: Pipeline, child: Node + ) -> Iterable[Node]: + """ + Returns the parents of a given node in a pipeline. + """ + pass + + def _output_is_persistent(node: Node) -> bool: + """ + Return true if the node has persistent output, false if not. + """ + pass def run_node( node: Node, From f90daf7f544d1a02cad8b7e5c3832532b301c168 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 10 Aug 2022 17:57:21 +0100 Subject: [PATCH 02/34] Add body to _enumerate_parents. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 700d7feac6..3af4a2f994 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -11,7 +11,7 @@ as_completed, wait, ) -from typing import Any, Dict, Iterable +from typing import Any, Dict, Iterable, List, Set from pluggy import PluginManager @@ -184,29 +184,31 @@ def _suggest_resume_scenario( ) def _find_first_persistent_ancestors( - self, pipeline: Pipeline, start: Node - ) -> Iterable[Node]: + self, pipeline: Pipeline, start: Node, catalog: DataCatalog + ) -> Set[Node]: """ Depth-first search approach to finding the first ancestors """ - stack, result = [start], set() + stack, first_persistent_ancestors = [start], set() while stack: current_node = stack.pop() - if self._output_is_persistent(current_node): - result.add(current_node) + if self._output_is_persistent(current_node, catalog): + first_persistent_ancestors.add(current_node) continue for parent in self._enumerate_parents(current_node, pipeline): stack.append(parent) + return first_persistent_ancestors def _enumerate_parents( self, pipeline: Pipeline, child: Node - ) -> Iterable[Node]: + ) -> List[Node]: """ Returns the parents of a given node in a pipeline. """ - pass + parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) + return parent_pipeline.nodes - def _output_is_persistent(node: Node) -> bool: + def _output_is_persistent(node: Node, catalog: DataCatalog) -> bool: """ Return true if the node has persistent output, false if not. """ From e1cb2e300a6021485ff4747c7f1c7e46d57624b4 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 11 Aug 2022 15:34:09 +0100 Subject: [PATCH 03/34] Add function to check persistence of node outputs. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 3af4a2f994..e2eb505efd 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -16,7 +16,7 @@ from pluggy import PluginManager from kedro.framework.hooks.manager import _NullPluginManager -from kedro.io import AbstractDataSet, DataCatalog +from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet from kedro.pipeline import Pipeline from kedro.pipeline.node import Node @@ -192,7 +192,7 @@ def _find_first_persistent_ancestors( stack, first_persistent_ancestors = [start], set() while stack: current_node = stack.pop() - if self._output_is_persistent(current_node, catalog): + if self._has_persistent_inputs(current_node, catalog): first_persistent_ancestors.add(current_node) continue for parent in self._enumerate_parents(current_node, pipeline): @@ -208,11 +208,16 @@ def _enumerate_parents( parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) return parent_pipeline.nodes - def _output_is_persistent(node: Node, catalog: DataCatalog) -> bool: + def _has_persistent_inputs( + self, node: Node, catalog: DataCatalog + ) -> bool: """ Return true if the node has persistent output, false if not. """ - pass + for node_input in node.inputs: + if type(catalog.DataSets[node_input]) == MemoryDataSet: + return False + return True def run_node( node: Node, From 18a61056f6e92ab5e3861f9124c229b39dcea0ef Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 11 Aug 2022 18:35:38 +0100 Subject: [PATCH 04/34] Modify _suggest_resume_scenario to use _find_first_persistent_ancestors Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index e2eb505efd..f670275bfe 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -162,9 +162,9 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet: pass def _suggest_resume_scenario( - self, pipeline: Pipeline, done_nodes: Iterable[Node] + self, pipeline: Pipeline, done_nodes: Iterable[Node], start: Node, catalog: DataCatalog ) -> None: - remaining_nodes = set(pipeline.nodes) - set(done_nodes) + """remaining_nodes = set(pipeline.nodes) - set(done_nodes) postfix = "" if done_nodes: @@ -181,6 +181,15 @@ def _suggest_resume_scenario( "argument to your previous command:\n%s", len(remaining_nodes), postfix, + )""" + first_persistent_ancestors = self._find_first_persistent_ancestors( + pipeline, + start, + catalog + ) + postfix = str(first_persistent_ancestors) + self._logger.warning( + f"first good node(s) is / are: {postfix}", ) def _find_first_persistent_ancestors( @@ -195,7 +204,7 @@ def _find_first_persistent_ancestors( if self._has_persistent_inputs(current_node, catalog): first_persistent_ancestors.add(current_node) continue - for parent in self._enumerate_parents(current_node, pipeline): + for parent in self._enumerate_parents(pipeline, current_node): stack.append(parent) return first_persistent_ancestors @@ -215,7 +224,7 @@ def _has_persistent_inputs( Return true if the node has persistent output, false if not. """ for node_input in node.inputs: - if type(catalog.DataSets[node_input]) == MemoryDataSet: + if type(catalog._data_sets[node_input]) == MemoryDataSet: return False return True From 7753486955e62193e918051fae1511d85fd2330b Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 11 Aug 2022 18:36:14 +0100 Subject: [PATCH 05/34] Pass catalog to self._suggest_resume_scenario Signed-off-by: Jannic Holzer --- kedro/runner/sequential_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index dcad9324fe..f3e1374f44 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -70,7 +70,7 @@ def _run( run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node) except Exception: - self._suggest_resume_scenario(pipeline, done_nodes) + self._suggest_resume_scenario(pipeline, done_nodes, node, catalog) raise # decrement load counts and release any data sets we've finished with From a402aa73f5ae04aac461f5a4ec7faf31c02a3457 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 15 Aug 2022 11:44:10 +0100 Subject: [PATCH 06/34] Track and return all ancestor nodes that must be re-run during DFS. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index f670275bfe..1267ae8a25 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -192,21 +192,21 @@ def _suggest_resume_scenario( f"first good node(s) is / are: {postfix}", ) - def _find_first_persistent_ancestors( + def _find_ancestor_nodes_to_run( self, pipeline: Pipeline, start: Node, catalog: DataCatalog ) -> Set[Node]: """ Depth-first search approach to finding the first ancestors """ - stack, first_persistent_ancestors = [start], set() + stack, ancestor_nodes_to_run = [start], set() while stack: current_node = stack.pop() + ancestor_nodes_to_run.add(current_node) if self._has_persistent_inputs(current_node, catalog): - first_persistent_ancestors.add(current_node) continue for parent in self._enumerate_parents(pipeline, current_node): stack.append(parent) - return first_persistent_ancestors + return ancestor_nodes_to_run def _enumerate_parents( self, pipeline: Pipeline, child: Node @@ -218,7 +218,7 @@ def _enumerate_parents( return parent_pipeline.nodes def _has_persistent_inputs( - self, node: Node, catalog: DataCatalog + self, node: Node, catalog: DataCatalog ) -> bool: """ Return true if the node has persistent output, false if not. From 699a9f5308768c57965a8e5badd3d671b452e487 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 15 Aug 2022 12:05:13 +0100 Subject: [PATCH 07/34] Integrate DFS with original _suggest_resume_scenario. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 1267ae8a25..4c9f662e98 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -162,9 +162,14 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet: pass def _suggest_resume_scenario( - self, pipeline: Pipeline, done_nodes: Iterable[Node], start: Node, catalog: DataCatalog + self, + pipeline: Pipeline, + done_nodes: Iterable[Node], + start: Node, + catalog: DataCatalog, ) -> None: - """remaining_nodes = set(pipeline.nodes) - set(done_nodes) + ancestors_to_run = self._find_ancestor_nodes_to_run(pipeline, start, catalog) + remaining_nodes = (set(pipeline.nodes) - set(done_nodes)) | ancestors_to_run postfix = "" if done_nodes: @@ -181,15 +186,6 @@ def _suggest_resume_scenario( "argument to your previous command:\n%s", len(remaining_nodes), postfix, - )""" - first_persistent_ancestors = self._find_first_persistent_ancestors( - pipeline, - start, - catalog - ) - postfix = str(first_persistent_ancestors) - self._logger.warning( - f"first good node(s) is / are: {postfix}", ) def _find_ancestor_nodes_to_run( @@ -208,18 +204,14 @@ def _find_ancestor_nodes_to_run( stack.append(parent) return ancestor_nodes_to_run - def _enumerate_parents( - self, pipeline: Pipeline, child: Node - ) -> List[Node]: + def _enumerate_parents(self, pipeline: Pipeline, child: Node) -> List[Node]: """ Returns the parents of a given node in a pipeline. """ parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) return parent_pipeline.nodes - def _has_persistent_inputs( - self, node: Node, catalog: DataCatalog - ) -> bool: + def _has_persistent_inputs(self, node: Node, catalog: DataCatalog) -> bool: """ Return true if the node has persistent output, false if not. """ @@ -228,6 +220,7 @@ def _has_persistent_inputs( return False return True + def run_node( node: Node, catalog: DataCatalog, From a49a6f7d82fa2a5675a3d6d6ea9a79417e9d1f2b Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Tue, 16 Aug 2022 13:22:01 +0100 Subject: [PATCH 08/34] Implement backwards-DFS strategy on all boundary nodes. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 4c9f662e98..46e608737d 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -168,16 +168,20 @@ def _suggest_resume_scenario( start: Node, catalog: DataCatalog, ) -> None: - ancestors_to_run = self._find_ancestor_nodes_to_run(pipeline, start, catalog) - remaining_nodes = (set(pipeline.nodes) - set(done_nodes)) | ancestors_to_run + remaining_nodes = set(pipeline.nodes) - set(done_nodes) postfix = "" if done_nodes: node_names = (n.name for n in remaining_nodes) resume_p = pipeline.only_nodes(*node_names) - start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs()) - start_node_names = (n.name for n in start_p.nodes) + + # find the nearest persistent ancestors of the nodes in start_p + start_p_persistent_ancestors = self._find_persistent_ancestors( + pipeline, start_p.nodes, catalog + ) + + start_node_names = (n.name for n in start_p_persistent_ancestors) postfix += f" --from-nodes \"{','.join(start_node_names)}\"" self._logger.warning( @@ -188,20 +192,22 @@ def _suggest_resume_scenario( postfix, ) - def _find_ancestor_nodes_to_run( - self, pipeline: Pipeline, start: Node, catalog: DataCatalog + def _find_persistent_ancestors( + self, pipeline: Pipeline, boundary_nodes: Iterable[Node], catalog: DataCatalog ) -> Set[Node]: """ Depth-first search approach to finding the first ancestors """ - stack, ancestor_nodes_to_run = [start], set() - while stack: - current_node = stack.pop() - ancestor_nodes_to_run.add(current_node) - if self._has_persistent_inputs(current_node, catalog): - continue - for parent in self._enumerate_parents(pipeline, current_node): - stack.append(parent) + ancestor_nodes_to_run = set() + for boundary_node in boundary_nodes: + stack = [boundary_node] + while stack: + current_node = stack.pop() + if self._has_persistent_inputs(current_node, catalog): + ancestor_nodes_to_run.add(current_node) + continue + for parent in self._enumerate_parents(pipeline, current_node): + stack.append(parent) return ancestor_nodes_to_run def _enumerate_parents(self, pipeline: Pipeline, child: Node) -> List[Node]: From 7955d0d56d787e9bce74cfa8cddbd29b9ab18a39 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 17 Aug 2022 11:15:18 +0100 Subject: [PATCH 09/34] Switch to multi-node start BFS approach to finding persistent ancestors. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 46e608737d..f7f5d51107 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -20,6 +20,7 @@ from kedro.pipeline import Pipeline from kedro.pipeline.node import Node +from collections import deque class AbstractRunner(ABC): """``AbstractRunner`` is the base class for all ``Pipeline`` runner @@ -165,7 +166,6 @@ def _suggest_resume_scenario( self, pipeline: Pipeline, done_nodes: Iterable[Node], - start: Node, catalog: DataCatalog, ) -> None: remaining_nodes = set(pipeline.nodes) - set(done_nodes) @@ -196,18 +196,19 @@ def _find_persistent_ancestors( self, pipeline: Pipeline, boundary_nodes: Iterable[Node], catalog: DataCatalog ) -> Set[Node]: """ - Depth-first search approach to finding the first ancestors + Breadth-first search approach to finding the first ancestors """ ancestor_nodes_to_run = set() - for boundary_node in boundary_nodes: - stack = [boundary_node] - while stack: - current_node = stack.pop() - if self._has_persistent_inputs(current_node, catalog): - ancestor_nodes_to_run.add(current_node) - continue - for parent in self._enumerate_parents(pipeline, current_node): - stack.append(parent) + queue, visited = deque(boundary_nodes), set() + while queue: + current_node = queue.popleft() + visited.add(current_node) + if self._has_persistent_inputs(current_node, catalog): + ancestor_nodes_to_run.add(current_node) + continue + for parent in self._enumerate_parents(pipeline, current_node): + if parent not in visited: + queue.append(parent) return ancestor_nodes_to_run def _enumerate_parents(self, pipeline: Pipeline, child: Node) -> List[Node]: From 68764f7a996903a8fb186da3503e077acdb09493 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 11:39:22 +0100 Subject: [PATCH 10/34] Add a useful error message if no nodes ran. Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index f7f5d51107..3752f77a0d 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -184,13 +184,19 @@ def _suggest_resume_scenario( start_node_names = (n.name for n in start_p_persistent_ancestors) postfix += f" --from-nodes \"{','.join(start_node_names)}\"" - self._logger.warning( - "There are %d nodes that have not run.\n" - "You can resume the pipeline run by adding the following " - "argument to your previous command:\n%s", - len(remaining_nodes), - postfix, - ) + + if not postfix: + self._logger.warning( + "No nodes ran. Repeat the previous command to attempt a new run." + ) + else: + self._logger.warning( + "There are %d nodes that have not run.\n" + "You can resume the pipeline run by adding the following " + "argument to your previous command:\n%s", + len(remaining_nodes), + postfix, + ) def _find_persistent_ancestors( self, pipeline: Pipeline, boundary_nodes: Iterable[Node], catalog: DataCatalog From 74c60f70605ace66be273fdf4acf6d7be2926f5f Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 14:17:53 +0100 Subject: [PATCH 11/34] Add docstrings to new functions. Signed-off-by: Jannic Holzer --- kedro/runner/parallel_runner.py | 2 +- kedro/runner/runner.py | 44 ++++++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index ee744d5871..89dd4ce41c 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -335,7 +335,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression try: node = future.result() except Exception: - self._suggest_resume_scenario(pipeline, done_nodes) + self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise done_nodes.add(node) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 3752f77a0d..79f3f511d2 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -158,7 +158,6 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet: Returns: An instance of an implementation of ``AbstractDataSet`` to be used for all unregistered datasets. - """ pass @@ -199,13 +198,23 @@ def _suggest_resume_scenario( ) def _find_persistent_ancestors( - self, pipeline: Pipeline, boundary_nodes: Iterable[Node], catalog: DataCatalog + self, pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog ) -> Set[Node]: - """ - Breadth-first search approach to finding the first ancestors + """Breadth-first search approach to finding the complete set of + persistent ancestors of an iterable of ``Node``s. Persistent + ancestors exclusively have persisted ``Dataset``s as inputs. + + Args: + pipeline: the ``Pipeline`` to find ancestors in. + children: the iterable containing ``Node``s to find ancestors of. + catalog: the ``DataCatalog`` of the run. + + Returns: + A set containing first persistent ancestors of the given + ``Node``s. """ ancestor_nodes_to_run = set() - queue, visited = deque(boundary_nodes), set() + queue, visited = deque(children), set() while queue: current_node = queue.popleft() visited.add(current_node) @@ -217,16 +226,33 @@ def _find_persistent_ancestors( queue.append(parent) return ancestor_nodes_to_run + @staticmethod def _enumerate_parents(self, pipeline: Pipeline, child: Node) -> List[Node]: - """ - Returns the parents of a given node in a pipeline. + """For a given ``Node``, returns a list containing the direct parents + of that ``Node`` in the given ``Pipeline``. + + Args: + pipeline: the ``Pipeline`` to search for direct parents in. + child: the ``Node`` to find parents of. + + Returns: + A list of all ``Node``s that are direct parents of ``child``. """ parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) return parent_pipeline.nodes + @staticmethod def _has_persistent_inputs(self, node: Node, catalog: DataCatalog) -> bool: - """ - Return true if the node has persistent output, false if not. + """Check if a ``Node`` exclusively has persisted Datasets as inputs. + If at least one input is a ``MemoryDataSet``, return False. + + Args: + node: the ``Node`` to check the inputs of. + catalog: the ``DataCatalog`` of the run. + + Returns: + True if the ``Node`` being checked exclusively has inputs that + are not ``MemoryDataSet``, else False. """ for node_input in node.inputs: if type(catalog._data_sets[node_input]) == MemoryDataSet: From 958fb914672c7335e9fb76a3f340295fb2be9f79 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 14:19:22 +0100 Subject: [PATCH 12/34] Add catalog argument to self._suggest_resume_scenario Signed-off-by: Jannic Holzer --- kedro/runner/sequential_runner.py | 2 +- kedro/runner/thread_runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro/runner/sequential_runner.py b/kedro/runner/sequential_runner.py index f3e1374f44..6a9becb868 100644 --- a/kedro/runner/sequential_runner.py +++ b/kedro/runner/sequential_runner.py @@ -70,7 +70,7 @@ def _run( run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node) except Exception: - self._suggest_resume_scenario(pipeline, done_nodes, node, catalog) + self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise # decrement load counts and release any data sets we've finished with diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 57136f67e0..6e34484860 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -131,7 +131,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression try: node = future.result() except Exception: - self._suggest_resume_scenario(pipeline, done_nodes) + self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise done_nodes.add(node) self._logger.info("Completed node: %s", node.name) From d61a19bd5b660220144f0ddf080737a7a787fa55 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 14:20:33 +0100 Subject: [PATCH 13/34] Modify exception_fn to allow it to take multiple arguments Signed-off-by: Jannic Holzer --- tests/runner/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index ddde22059a..ac4e9d8e1e 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -22,7 +22,7 @@ def fan_in(*args): return args -def exception_fn(arg): +def exception_fn(*args): raise Exception("test exception") From 872492331a2f40bc16253fe71e32d57fc3ebc836 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 14:22:00 +0100 Subject: [PATCH 14/34] Add test for AbstractRunner._suggest_resume_scenario Signed-off-by: Jannic Holzer --- tests/runner/test_sequential_runner.py | 78 +++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 6bcc01a139..faa580b536 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,7 +1,8 @@ -from typing import Any, Dict +from typing import Any, Dict, List import pandas as pd import pytest +import re from kedro.framework.hooks import _create_hook_manager from kedro.io import ( @@ -13,7 +14,7 @@ ) from kedro.pipeline import Pipeline, node from kedro.runner import SequentialRunner -from tests.runner.conftest import identity, sink, source +from tests.runner.conftest import identity, sink, source, exception_fn @pytest.fixture @@ -278,3 +279,76 @@ def test_confirms(self, mocker, pipeline, is_async): catalog = DataCatalog(data_sets={"ds1": fake_dataset_instance}) SequentialRunner(is_async=is_async).run(pipeline, catalog) fake_dataset_instance.confirm.assert_called_once_with() + + +@pytest.fixture +def resume_scenario_pipeline(): + return Pipeline( + [ + node(identity, "ds0_A", "ds1_A", name="node1_A"), + node(identity, "ds0_B", "ds1_B", name="node1_B"), + node( + multi_input_list_output, + ["ds1_A", "ds1_B"], + ["ds2_A", "ds2_B"], + name="node2", + ), + node(identity, "ds2_A", "ds3_A", name="node3_A"), + node(identity, "ds2_B", "ds3_B", name="node3_B"), + node(identity, "ds3_A", "ds4_A", name="node4_A"), + node(identity, "ds3_B", "ds4_B", name="node4_B"), + ] + ) + + +@pytest.fixture +def resume_scenario_catalog(): + def _load(): + return 0 + + def _save(arg): + assert arg == 0 + + persistent_dataset = LambdaDataSet(load=_load, save=_save) + return DataCatalog( + { + "ds0_A": persistent_dataset, + "ds0_B": persistent_dataset, + "ds2_A": persistent_dataset, + "ds2_B": persistent_dataset, + } + ) + + +@pytest.mark.parametrize( + "failing_node_indexes,expected_pattern", + [ + ([0], r"No nodes ran."), + ([2], r"(node1_A,node1_B|node1_B,node1_A)"), + ([3], r"(node3_A,node3_B|node3_B,node3_A)"), + ([5], r"(node3_A,node3_B|node3_B,node3_A)"), + ([3,5], r"(node3_A,node3_B|node3_B,node3_A)"), + ([2,5], r"(node1_A,node1_B|node1_B,node1_A)"), + ], +) +class TestSuggestResumeScenario: + def test_suggest_resume_scenario( + self, + caplog, + resume_scenario_catalog, + resume_scenario_pipeline, + failing_node_indexes, + expected_pattern, + ): + for idx in failing_node_indexes: + failing_node = resume_scenario_pipeline.nodes[idx] + resume_scenario_pipeline -= Pipeline([failing_node]) + resume_scenario_pipeline += Pipeline([failing_node._copy(func=exception_fn)]) + try: + SequentialRunner().run( + resume_scenario_pipeline, + resume_scenario_catalog, + hook_manager=_create_hook_manager(), + ) + except: + assert re.search(expected_pattern, caplog.text) From f57c43147082b71276bd234bc3709382e5264c2e Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 14:45:12 +0100 Subject: [PATCH 15/34] Add docstring for _suggest_resume_scenario Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 79f3f511d2..c5f1c2d471 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -22,6 +22,7 @@ from collections import deque + class AbstractRunner(ABC): """``AbstractRunner`` is the base class for all ``Pipeline`` runner implementations. @@ -167,6 +168,16 @@ def _suggest_resume_scenario( done_nodes: Iterable[Node], catalog: DataCatalog, ) -> None: + """ + Suggest a command to the user to resume a run after it fails. + The run should be started from the point closest to the failure + for which persisted input exists. + + Args: + pipeline: the ``Pipeline`` of the run. + done_nodes: the ``Node``s that executed successfully. + catalog: the ``DataCatalog`` of the run. + """ remaining_nodes = set(pipeline.nodes) - set(done_nodes) postfix = "" @@ -183,7 +194,6 @@ def _suggest_resume_scenario( start_node_names = (n.name for n in start_p_persistent_ancestors) postfix += f" --from-nodes \"{','.join(start_node_names)}\"" - if not postfix: self._logger.warning( "No nodes ran. Repeat the previous command to attempt a new run." From 9fda4c0acfca18f2bb3418eda7782b71e8dacda5 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 15:03:41 +0100 Subject: [PATCH 16/34] Improve formatting Signed-off-by: Jannic Holzer --- tests/runner/test_sequential_runner.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index faa580b536..ee8643e110 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -327,8 +327,8 @@ def _save(arg): ([2], r"(node1_A,node1_B|node1_B,node1_A)"), ([3], r"(node3_A,node3_B|node3_B,node3_A)"), ([5], r"(node3_A,node3_B|node3_B,node3_A)"), - ([3,5], r"(node3_A,node3_B|node3_B,node3_A)"), - ([2,5], r"(node1_A,node1_B|node1_B,node1_A)"), + ([3, 5], r"(node3_A,node3_B|node3_B,node3_A)"), + ([2, 5], r"(node1_A,node1_B|node1_B,node1_A)"), ], ) class TestSuggestResumeScenario: @@ -343,7 +343,9 @@ def test_suggest_resume_scenario( for idx in failing_node_indexes: failing_node = resume_scenario_pipeline.nodes[idx] resume_scenario_pipeline -= Pipeline([failing_node]) - resume_scenario_pipeline += Pipeline([failing_node._copy(func=exception_fn)]) + resume_scenario_pipeline += Pipeline( + [failing_node._copy(func=exception_fn)] + ) try: SequentialRunner().run( resume_scenario_pipeline, From 3a790591498f5ffb9ba182ae1746d549e2f9fc12 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 15:37:01 +0100 Subject: [PATCH 17/34] Move new functions out of AbstractRunner Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 111 +++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 55 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index c5f1c2d471..276078f2e2 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -187,7 +187,7 @@ def _suggest_resume_scenario( start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs()) # find the nearest persistent ancestors of the nodes in start_p - start_p_persistent_ancestors = self._find_persistent_ancestors( + start_p_persistent_ancestors = _find_persistent_ancestors( pipeline, start_p.nodes, catalog ) @@ -207,67 +207,68 @@ def _suggest_resume_scenario( postfix, ) - def _find_persistent_ancestors( - self, pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog - ) -> Set[Node]: - """Breadth-first search approach to finding the complete set of - persistent ancestors of an iterable of ``Node``s. Persistent - ancestors exclusively have persisted ``Dataset``s as inputs. - Args: - pipeline: the ``Pipeline`` to find ancestors in. - children: the iterable containing ``Node``s to find ancestors of. - catalog: the ``DataCatalog`` of the run. +def _find_persistent_ancestors( + pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog +) -> Set[Node]: + """Breadth-first search approach to finding the complete set of + persistent ancestors of an iterable of ``Node``s. Persistent + ancestors exclusively have persisted ``Dataset``s as inputs. - Returns: - A set containing first persistent ancestors of the given - ``Node``s. - """ - ancestor_nodes_to_run = set() - queue, visited = deque(children), set() - while queue: - current_node = queue.popleft() - visited.add(current_node) - if self._has_persistent_inputs(current_node, catalog): - ancestor_nodes_to_run.add(current_node) - continue - for parent in self._enumerate_parents(pipeline, current_node): - if parent not in visited: - queue.append(parent) - return ancestor_nodes_to_run - - @staticmethod - def _enumerate_parents(self, pipeline: Pipeline, child: Node) -> List[Node]: - """For a given ``Node``, returns a list containing the direct parents - of that ``Node`` in the given ``Pipeline``. + Args: + pipeline: the ``Pipeline`` to find ancestors in. + children: the iterable containing ``Node``s to find ancestors of. + catalog: the ``DataCatalog`` of the run. - Args: - pipeline: the ``Pipeline`` to search for direct parents in. - child: the ``Node`` to find parents of. + Returns: + A set containing first persistent ancestors of the given + ``Node``s. + """ + ancestor_nodes_to_run = set() + queue, visited = deque(children), set() + while queue: + current_node = queue.popleft() + visited.add(current_node) + if _has_persistent_inputs(current_node, catalog): + ancestor_nodes_to_run.add(current_node) + continue + for parent in _enumerate_parents(pipeline, current_node): + if parent not in visited: + queue.append(parent) + return ancestor_nodes_to_run + + +def _enumerate_parents(pipeline: Pipeline, child: Node) -> List[Node]: + """For a given ``Node``, returns a list containing the direct parents + of that ``Node`` in the given ``Pipeline``. - Returns: - A list of all ``Node``s that are direct parents of ``child``. - """ - parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) - return parent_pipeline.nodes + Args: + pipeline: the ``Pipeline`` to search for direct parents in. + child: the ``Node`` to find parents of. - @staticmethod - def _has_persistent_inputs(self, node: Node, catalog: DataCatalog) -> bool: - """Check if a ``Node`` exclusively has persisted Datasets as inputs. - If at least one input is a ``MemoryDataSet``, return False. + Returns: + A list of all ``Node``s that are direct parents of ``child``. + """ + parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) + return parent_pipeline.nodes - Args: - node: the ``Node`` to check the inputs of. - catalog: the ``DataCatalog`` of the run. - Returns: - True if the ``Node`` being checked exclusively has inputs that - are not ``MemoryDataSet``, else False. - """ - for node_input in node.inputs: - if type(catalog._data_sets[node_input]) == MemoryDataSet: - return False - return True +def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool: + """Check if a ``Node`` exclusively has persisted Datasets as inputs. + If at least one input is a ``MemoryDataSet``, return False. + + Args: + node: the ``Node`` to check the inputs of. + catalog: the ``DataCatalog`` of the run. + + Returns: + True if the ``Node`` being checked exclusively has inputs that + are not ``MemoryDataSet``, else False. + """ + for node_input in node.inputs: + if type(catalog._data_sets[node_input]) == MemoryDataSet: + return False + return True def run_node( From 13063dd034f472bdd8d3bcf0aed4a7335d8fa02b Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 15:40:20 +0100 Subject: [PATCH 18/34] Remove bare except Signed-off-by: Jannic Holzer --- tests/runner/test_sequential_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index ee8643e110..d4efbe40f4 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List +from typing import Any, Dict import pandas as pd import pytest @@ -352,5 +352,5 @@ def test_suggest_resume_scenario( resume_scenario_catalog, hook_manager=_create_hook_manager(), ) - except: + except Exception: assert re.search(expected_pattern, caplog.text) From f29bbf5392956f5e40cf24a1c4d30971036fcfa5 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 16:06:49 +0100 Subject: [PATCH 19/34] Fix broad except clause Signed-off-by: Jannic Holzer --- tests/runner/test_sequential_runner.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index d4efbe40f4..80edec0f23 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -346,11 +346,10 @@ def test_suggest_resume_scenario( resume_scenario_pipeline += Pipeline( [failing_node._copy(func=exception_fn)] ) - try: + with pytest.raises(Exception): SequentialRunner().run( resume_scenario_pipeline, resume_scenario_catalog, hook_manager=_create_hook_manager(), ) - except Exception: - assert re.search(expected_pattern, caplog.text) + assert re.search(expected_pattern, caplog.text) From 01d5ab0501d12e964fbbe1656ee2ab80a5c997ab Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 21:26:39 +0100 Subject: [PATCH 20/34] Access datasets __dict__ using vars() Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 276078f2e2..edeefa083c 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -177,6 +177,7 @@ def _suggest_resume_scenario( pipeline: the ``Pipeline`` of the run. done_nodes: the ``Node``s that executed successfully. catalog: the ``DataCatalog`` of the run. + """ remaining_nodes = set(pipeline.nodes) - set(done_nodes) @@ -223,6 +224,7 @@ def _find_persistent_ancestors( Returns: A set containing first persistent ancestors of the given ``Node``s. + """ ancestor_nodes_to_run = set() queue, visited = deque(children), set() @@ -248,6 +250,7 @@ def _enumerate_parents(pipeline: Pipeline, child: Node) -> List[Node]: Returns: A list of all ``Node``s that are direct parents of ``child``. + """ parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs) return parent_pipeline.nodes @@ -264,9 +267,10 @@ def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool: Returns: True if the ``Node`` being checked exclusively has inputs that are not ``MemoryDataSet``, else False. + """ for node_input in node.inputs: - if type(catalog._data_sets[node_input]) == MemoryDataSet: + if isinstance(vars(catalog.datasets)[node_input], MemoryDataSet): return False return True From 1dae5e707aa8f385541b43c3320a1c463c515431 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 21:38:26 +0100 Subject: [PATCH 21/34] Sort imports Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 3 +-- tests/runner/test_sequential_runner.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index edeefa083c..8283ac2541 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -4,6 +4,7 @@ import logging from abc import ABC, abstractmethod +from collections import deque from concurrent.futures import ( ALL_COMPLETED, Future, @@ -20,8 +21,6 @@ from kedro.pipeline import Pipeline from kedro.pipeline.node import Node -from collections import deque - class AbstractRunner(ABC): """``AbstractRunner`` is the base class for all ``Pipeline`` runner diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 80edec0f23..4684f7c0ad 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,8 +1,8 @@ +import re from typing import Any, Dict import pandas as pd import pytest -import re from kedro.framework.hooks import _create_hook_manager from kedro.io import ( @@ -14,7 +14,7 @@ ) from kedro.pipeline import Pipeline, node from kedro.runner import SequentialRunner -from tests.runner.conftest import identity, sink, source, exception_fn +from tests.runner.conftest import exception_fn, identity, sink, source @pytest.fixture From d5728966dac19dd1dee6e7ae8532d94456fc25c7 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 18 Aug 2022 22:22:41 +0100 Subject: [PATCH 22/34] Improve resume message Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 8283ac2541..c2cd66578a 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -201,7 +201,8 @@ def _suggest_resume_scenario( else: self._logger.warning( "There are %d nodes that have not run.\n" - "You can resume the pipeline run by adding the following " + "You can resume the pipeline run from the nearest nodes with" + "persisted inputs by adding the following " "argument to your previous command:\n%s", len(remaining_nodes), postfix, From af405ed60fe1698d464a4a54bed35685ea4dcc4d Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 19 Aug 2022 10:51:47 +0100 Subject: [PATCH 23/34] Add a space to resume suggestion message Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index c2cd66578a..ca5e6faea3 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -201,7 +201,7 @@ def _suggest_resume_scenario( else: self._logger.warning( "There are %d nodes that have not run.\n" - "You can resume the pipeline run from the nearest nodes with" + "You can resume the pipeline run from the nearest nodes with " "persisted inputs by adding the following " "argument to your previous command:\n%s", len(remaining_nodes), From 836d58364712e900dc794823e1eb8875beaf0ac5 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 26 Aug 2022 11:54:31 +0100 Subject: [PATCH 24/34] Modify DFS logic to eliminate possible queue duplicates Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index ca5e6faea3..be0a6374c9 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -227,16 +227,17 @@ def _find_persistent_ancestors( """ ancestor_nodes_to_run = set() - queue, visited = deque(children), set() + queue, visited = deque(children), set(children) while queue: current_node = queue.popleft() - visited.add(current_node) if _has_persistent_inputs(current_node, catalog): ancestor_nodes_to_run.add(current_node) continue for parent in _enumerate_parents(pipeline, current_node): - if parent not in visited: - queue.append(parent) + if parent in visited: + continue + visited.add(parent) + queue.append(parent) return ancestor_nodes_to_run From 7c7e79546c2739d034d03444a00dc0fdb6304414 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 26 Aug 2022 12:17:34 +0100 Subject: [PATCH 25/34] Modify catalog.datasets to catalog._data_sets w/ disabled linter warning Signed-off-by: Jannic Holzer --- kedro/runner/runner.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index be0a6374c9..70f6b127e8 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -271,7 +271,8 @@ def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool: """ for node_input in node.inputs: - if isinstance(vars(catalog.datasets)[node_input], MemoryDataSet): + # pylint: disable=protected-access + if isinstance(catalog._data_sets[node_input], MemoryDataSet): return False return True From 069a6feede3bf53d7688aeb1a1511d58b9c3d3e9 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 26 Aug 2022 16:31:33 +0100 Subject: [PATCH 26/34] Move all pytest fixtures to conftest.py Signed-off-by: Jannic Holzer --- tests/runner/conftest.py | 80 ++++++++++++++++++++- tests/runner/test_sequential_runner.py | 98 +++----------------------- 2 files changed, 87 insertions(+), 91 deletions(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index ac4e9d8e1e..ba7d26b233 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -1,8 +1,9 @@ from random import random +import pandas as pd import pytest -from kedro.io import DataCatalog +from kedro.io import DataCatalog, LambdaDataSet, MemoryDataSet from kedro.pipeline import Pipeline, node @@ -35,11 +36,54 @@ def return_not_serialisable(arg): # pylint: disable=unused-argument return lambda x: x +def multi_input_list_output(arg1, arg2): + return [arg1, arg2] + + +@pytest.fixture +def conflicting_feed_dict(pandas_df_feed_dict): + ds1 = MemoryDataSet({"data": 0}) + ds3 = pandas_df_feed_dict["ds3"] + return {"ds1": ds1, "ds3": ds3} + + +@pytest.fixture +def pandas_df_feed_dict(): + pandas_df = pd.DataFrame({"Name": ["Alex", "Bob"], "Age": [15, 25]}) + return {"ds3": pandas_df} + + @pytest.fixture def catalog(): return DataCatalog() +@pytest.fixture +def memory_catalog(): + ds1 = MemoryDataSet({"data": 42}) + ds2 = MemoryDataSet([1, 2, 3, 4, 5]) + return DataCatalog({"ds1": ds1, "ds2": ds2}) + + +@pytest.fixture +def persistent_dataset_catalog(): + def _load(): + return 0 + + def _save(arg): + assert arg == 0 + + persistent_dataset = LambdaDataSet(load=_load, save=_save) + return DataCatalog( + { + "ds0_A": persistent_dataset, + "ds0_B": persistent_dataset, + "ds2_A": persistent_dataset, + "ds2_B": persistent_dataset, + } + ) + + @pytest.fixture def fan_out_fan_in(): return Pipeline( @@ -87,3 +131,37 @@ def saving_none_pipeline(): return Pipeline( [node(random, None, "A"), node(return_none, "A", "B"), node(identity, "B", "C")] ) + + +@pytest.fixture +def unfinished_outputs_pipeline(): + return Pipeline( + [ + node(identity, dict(arg="ds4"), "ds8", name="node1"), + node(sink, "ds7", None, name="node2"), + node(multi_input_list_output, ["ds3", "ds4"], ["ds6", "ds7"], name="node3"), + node(identity, "ds2", "ds5", name="node4"), + node(identity, "ds1", "ds4", name="node5"), + ] + ) # Outputs: ['ds8', 'ds5', 'ds6'] == ['ds1', 'ds2', 'ds3'] + + +@pytest.fixture +def two_branches_crossed_pipeline(): + """A pipeline with an X-shape (two branches with one common node)""" + return Pipeline( + [ + node(identity, "ds0_A", "ds1_A", name="node1_A"), + node(identity, "ds0_B", "ds1_B", name="node1_B"), + node( + multi_input_list_output, + ["ds1_A", "ds1_B"], + ["ds2_A", "ds2_B"], + name="node2", + ), + node(identity, "ds2_A", "ds3_A", name="node3_A"), + node(identity, "ds2_B", "ds3_B", name="node3_B"), + node(identity, "ds3_A", "ds4_A", name="node4_A"), + node(identity, "ds3_B", "ds4_B", name="node4_B"), + ] + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 4684f7c0ad..356b96238c 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -5,42 +5,12 @@ import pytest from kedro.framework.hooks import _create_hook_manager -from kedro.io import ( - AbstractDataSet, - DataCatalog, - DataSetError, - LambdaDataSet, - MemoryDataSet, -) +from kedro.io import AbstractDataSet, DataCatalog, DataSetError, LambdaDataSet from kedro.pipeline import Pipeline, node from kedro.runner import SequentialRunner from tests.runner.conftest import exception_fn, identity, sink, source -@pytest.fixture -def memory_catalog(): - ds1 = MemoryDataSet({"data": 42}) - ds2 = MemoryDataSet([1, 2, 3, 4, 5]) - return DataCatalog({"ds1": ds1, "ds2": ds2}) - - -@pytest.fixture -def pandas_df_feed_dict(): - pandas_df = pd.DataFrame({"Name": ["Alex", "Bob"], "Age": [15, 25]}) - return {"ds3": pandas_df} - - -@pytest.fixture -def conflicting_feed_dict(pandas_df_feed_dict): - ds1 = MemoryDataSet({"data": 0}) - ds3 = pandas_df_feed_dict["ds3"] - return {"ds1": ds1, "ds3": ds3} - - -def multi_input_list_output(arg1, arg2): - return [arg1, arg2] - - class TestValidSequentialRunner: def test_run_with_plugin_manager(self, fan_out_fan_in, catalog): catalog.add_feed_dict(dict(A=42)) @@ -106,19 +76,6 @@ def _save(arg): assert output == {} -@pytest.fixture -def unfinished_outputs_pipeline(): - return Pipeline( - [ - node(identity, dict(arg="ds4"), "ds8", name="node1"), - node(sink, "ds7", None, name="node2"), - node(multi_input_list_output, ["ds3", "ds4"], ["ds6", "ds7"], name="node3"), - node(identity, "ds2", "ds5", name="node4"), - node(identity, "ds1", "ds4", name="node5"), - ] - ) # Outputs: ['ds8', 'ds5', 'ds6'] == ['ds1', 'ds2', 'ds3'] - - @pytest.mark.parametrize("is_async", [False, True]) class TestSeqentialRunnerBranchedPipeline: def test_input_seq( @@ -281,45 +238,6 @@ def test_confirms(self, mocker, pipeline, is_async): fake_dataset_instance.confirm.assert_called_once_with() -@pytest.fixture -def resume_scenario_pipeline(): - return Pipeline( - [ - node(identity, "ds0_A", "ds1_A", name="node1_A"), - node(identity, "ds0_B", "ds1_B", name="node1_B"), - node( - multi_input_list_output, - ["ds1_A", "ds1_B"], - ["ds2_A", "ds2_B"], - name="node2", - ), - node(identity, "ds2_A", "ds3_A", name="node3_A"), - node(identity, "ds2_B", "ds3_B", name="node3_B"), - node(identity, "ds3_A", "ds4_A", name="node4_A"), - node(identity, "ds3_B", "ds4_B", name="node4_B"), - ] - ) - - -@pytest.fixture -def resume_scenario_catalog(): - def _load(): - return 0 - - def _save(arg): - assert arg == 0 - - persistent_dataset = LambdaDataSet(load=_load, save=_save) - return DataCatalog( - { - "ds0_A": persistent_dataset, - "ds0_B": persistent_dataset, - "ds2_A": persistent_dataset, - "ds2_B": persistent_dataset, - } - ) - - @pytest.mark.parametrize( "failing_node_indexes,expected_pattern", [ @@ -335,21 +253,21 @@ class TestSuggestResumeScenario: def test_suggest_resume_scenario( self, caplog, - resume_scenario_catalog, - resume_scenario_pipeline, + two_branches_crossed_pipeline, + persistent_dataset_catalog, failing_node_indexes, expected_pattern, ): for idx in failing_node_indexes: - failing_node = resume_scenario_pipeline.nodes[idx] - resume_scenario_pipeline -= Pipeline([failing_node]) - resume_scenario_pipeline += Pipeline( + failing_node = two_branches_crossed_pipeline.nodes[idx] + two_branches_crossed_pipeline -= Pipeline([failing_node]) + two_branches_crossed_pipeline += Pipeline( [failing_node._copy(func=exception_fn)] ) with pytest.raises(Exception): SequentialRunner().run( - resume_scenario_pipeline, - resume_scenario_catalog, + two_branches_crossed_pipeline, + persistent_dataset_catalog, hook_manager=_create_hook_manager(), ) assert re.search(expected_pattern, caplog.text) From e4a65257d878d8aa9776923c2c6d58ad25417610 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 26 Aug 2022 17:49:25 +0100 Subject: [PATCH 27/34] Modify all instances of Pipeline to pipeline Signed-off-by: Jannic Holzer --- tests/runner/conftest.py | 2 +- tests/runner/test_sequential_runner.py | 32 +++++++++++++------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index ba7d26b233..28d7d2574b 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -148,7 +148,7 @@ def unfinished_outputs_pipeline(): @pytest.fixture def two_branches_crossed_pipeline(): - """A pipeline with an X-shape (two branches with one common node)""" + """A ``Pipeline`` with an X-shape (two branches with one common node)""" return Pipeline( [ node(identity, "ds0_A", "ds1_A", name="node1_A"), diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 356b96238c..f105465bf7 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -6,7 +6,7 @@ from kedro.framework.hooks import _create_hook_manager from kedro.io import AbstractDataSet, DataCatalog, DataSetError, LambdaDataSet -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node, pipeline from kedro.runner import SequentialRunner from tests.runner.conftest import exception_fn, identity, sink, source @@ -147,7 +147,7 @@ def _describe(self) -> Dict[str, Any]: class TestSequentialRunnerRelease: def test_dont_release_inputs_and_outputs(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [node(identity, "in", "middle"), node(identity, "middle", "out")] ) catalog = DataCatalog( @@ -157,14 +157,14 @@ def test_dont_release_inputs_and_outputs(self, is_async): "out": LoggingDataSet(log, "out"), } ) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we don't want to see release in or out in here assert log == [("load", "in"), ("load", "middle"), ("release", "middle")] def test_release_at_earliest_opportunity(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [ node(source, None, "first"), node(identity, "first", "second"), @@ -177,7 +177,7 @@ def test_release_at_earliest_opportunity(self, is_async): "second": LoggingDataSet(log, "second"), } ) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we want to see "release first" before "load second" assert log == [ @@ -189,7 +189,7 @@ def test_release_at_earliest_opportunity(self, is_async): def test_count_multiple_loads(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [ node(source, None, "dataset"), node(sink, "dataset", None, name="bob"), @@ -197,14 +197,14 @@ def test_count_multiple_loads(self, is_async): ] ) catalog = DataCatalog({"dataset": LoggingDataSet(log, "dataset")}) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we want to the release after both the loads assert log == [("load", "dataset"), ("load", "dataset"), ("release", "dataset")] def test_release_transcoded(self, is_async): log = [] - pipeline = Pipeline( + test_pipeline = pipeline( [node(source, None, "ds@save"), node(sink, "ds@load", None)] ) catalog = DataCatalog( @@ -214,16 +214,16 @@ def test_release_transcoded(self, is_async): } ) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) # we want to see both datasets being released assert log == [("release", "save"), ("load", "load"), ("release", "load")] @pytest.mark.parametrize( - "pipeline", + "test_pipeline", [ - Pipeline([node(identity, "ds1", "ds2", confirms="ds1")]), - Pipeline( + pipeline([node(identity, "ds1", "ds2", confirms="ds1")]), + pipeline( [ node(identity, "ds1", "ds2"), node(identity, "ds2", None, confirms="ds1"), @@ -231,10 +231,10 @@ def test_release_transcoded(self, is_async): ), ], ) - def test_confirms(self, mocker, pipeline, is_async): + def test_confirms(self, mocker, test_pipeline, is_async): fake_dataset_instance = mocker.Mock() catalog = DataCatalog(data_sets={"ds1": fake_dataset_instance}) - SequentialRunner(is_async=is_async).run(pipeline, catalog) + SequentialRunner(is_async=is_async).run(test_pipeline, catalog) fake_dataset_instance.confirm.assert_called_once_with() @@ -260,8 +260,8 @@ def test_suggest_resume_scenario( ): for idx in failing_node_indexes: failing_node = two_branches_crossed_pipeline.nodes[idx] - two_branches_crossed_pipeline -= Pipeline([failing_node]) - two_branches_crossed_pipeline += Pipeline( + two_branches_crossed_pipeline -= pipeline([failing_node]) + two_branches_crossed_pipeline += pipeline( [failing_node._copy(func=exception_fn)] ) with pytest.raises(Exception): From 32aa7a2d5eb23fa3421e82fe74fe0a25a4e4c997 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 26 Aug 2022 17:54:06 +0100 Subject: [PATCH 28/34] Fix typo in the name of TestSequentialRunnerBranchedPipeline Signed-off-by: Jannic Holzer --- tests/runner/test_sequential_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index f105465bf7..6587a208c9 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -77,7 +77,7 @@ def _save(arg): @pytest.mark.parametrize("is_async", [False, True]) -class TestSeqentialRunnerBranchedPipeline: +class TestSequentialRunnerBranchedPipeline: def test_input_seq( self, is_async, From e92a756fde94bbcac085c816859c449e694e518b Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 26 Aug 2022 22:29:49 +0100 Subject: [PATCH 29/34] Remove spurious assert in save of persistent_dataset_catalog Signed-off-by: Jannic Holzer --- tests/runner/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 28d7d2574b..1bf22c449c 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -71,7 +71,7 @@ def _load(): return 0 def _save(arg): - assert arg == 0 + pass persistent_dataset = LambdaDataSet(load=_load, save=_save) return DataCatalog( From ffb4074c33b1a4cc9db7dd0b2e2c8130dffd7379 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 31 Aug 2022 11:38:43 +0100 Subject: [PATCH 30/34] Replace instantiations of Pipeline with pipeline Signed-off-by: Jannic Holzer --- tests/runner/conftest.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 1bf22c449c..d09d7a342b 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -4,7 +4,7 @@ import pytest from kedro.io import DataCatalog, LambdaDataSet, MemoryDataSet -from kedro.pipeline import Pipeline, node +from kedro.pipeline import pipeline, node def source(): @@ -86,7 +86,7 @@ def _save(arg): @pytest.fixture def fan_out_fan_in(): - return Pipeline( + return pipeline( [ node(identity, "A", "B"), node(identity, "B", "C"), @@ -100,7 +100,7 @@ def fan_out_fan_in(): @pytest.fixture def branchless_no_input_pipeline(): """The pipeline runs in the order A->B->C->D->E.""" - return Pipeline( + return pipeline( [ node(identity, "D", "E", name="node1"), node(identity, "C", "D", name="node2"), @@ -113,7 +113,7 @@ def branchless_no_input_pipeline(): @pytest.fixture def branchless_pipeline(): - return Pipeline( + return pipeline( [ node(identity, "ds1", "ds2", name="node1"), node(identity, "ds2", "ds3", name="node2"), @@ -123,19 +123,19 @@ def branchless_pipeline(): @pytest.fixture def saving_result_pipeline(): - return Pipeline([node(identity, "ds", "dsX")]) + return pipeline([node(identity, "ds", "dsX")]) @pytest.fixture def saving_none_pipeline(): - return Pipeline( + return pipeline( [node(random, None, "A"), node(return_none, "A", "B"), node(identity, "B", "C")] ) @pytest.fixture def unfinished_outputs_pipeline(): - return Pipeline( + return pipeline( [ node(identity, dict(arg="ds4"), "ds8", name="node1"), node(sink, "ds7", None, name="node2"), @@ -149,7 +149,7 @@ def unfinished_outputs_pipeline(): @pytest.fixture def two_branches_crossed_pipeline(): """A ``Pipeline`` with an X-shape (two branches with one common node)""" - return Pipeline( + return pipeline( [ node(identity, "ds0_A", "ds1_A", name="node1_A"), node(identity, "ds0_B", "ds1_B", name="node1_B"), From 1bf5f1de48ff9634b2c207212db0afb23f537b06 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 31 Aug 2022 11:39:19 +0100 Subject: [PATCH 31/34] Modify test_suggest_resume_scenario fixture to use node names Signed-off-by: Jannic Holzer --- tests/runner/test_sequential_runner.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 6587a208c9..0b014ec0f0 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -239,14 +239,14 @@ def test_confirms(self, mocker, test_pipeline, is_async): @pytest.mark.parametrize( - "failing_node_indexes,expected_pattern", + "failing_node_names,expected_pattern", [ - ([0], r"No nodes ran."), - ([2], r"(node1_A,node1_B|node1_B,node1_A)"), - ([3], r"(node3_A,node3_B|node3_B,node3_A)"), - ([5], r"(node3_A,node3_B|node3_B,node3_A)"), - ([3, 5], r"(node3_A,node3_B|node3_B,node3_A)"), - ([2, 5], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node1_A"], r"No nodes ran."), + (["node2"], r"(node1_A,node1_B|node1_B,node1_A)"), + (["node3_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node3_A", "node4_A"], r"(node3_A,node3_B|node3_B,node3_A)"), + (["node2", "node4_A"], r"(node1_A,node1_B|node1_B,node1_A)"), ], ) class TestSuggestResumeScenario: @@ -255,14 +255,14 @@ def test_suggest_resume_scenario( caplog, two_branches_crossed_pipeline, persistent_dataset_catalog, - failing_node_indexes, + failing_node_names, expected_pattern, ): - for idx in failing_node_indexes: - failing_node = two_branches_crossed_pipeline.nodes[idx] - two_branches_crossed_pipeline -= pipeline([failing_node]) + nodes = {n.name: n for n in two_branches_crossed_pipeline.nodes} + for name in failing_node_names: + two_branches_crossed_pipeline -= pipeline([nodes[name]]) two_branches_crossed_pipeline += pipeline( - [failing_node._copy(func=exception_fn)] + [nodes[name]._copy(func=exception_fn)] ) with pytest.raises(Exception): SequentialRunner().run( From c2481dbc8e06bb27c02e18e2b6583956d464cd24 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 31 Aug 2022 11:48:41 +0100 Subject: [PATCH 32/34] Add disable=unused-argument to _save Signed-off-by: Jannic Holzer --- tests/runner/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index d09d7a342b..26d46516de 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -4,7 +4,7 @@ import pytest from kedro.io import DataCatalog, LambdaDataSet, MemoryDataSet -from kedro.pipeline import pipeline, node +from kedro.pipeline import node, pipeline def source(): @@ -70,6 +70,7 @@ def persistent_dataset_catalog(): def _load(): return 0 + # pylint: disable=unused-argument def _save(arg): pass From 99e0bb261d596f6bddbdec1ddf13e353f867a952 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 2 Sep 2022 14:56:37 +0100 Subject: [PATCH 33/34] Remove resume suggestion for ParallelRunner Signed-off-by: Jannic Holzer --- kedro/runner/parallel_runner.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 89dd4ce41c..13013a0179 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -335,7 +335,6 @@ def _run( # pylint: disable=too-many-locals,useless-suppression try: node = future.result() except Exception: - self._suggest_resume_scenario(pipeline, done_nodes, catalog) raise done_nodes.add(node) From a74fef63cb5e1757e954c9da0f429e03b7d98e5f Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 2 Sep 2022 15:23:07 +0100 Subject: [PATCH 34/34] Remove spurious try / except Signed-off-by: Jannic Holzer --- kedro/runner/parallel_runner.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index 13013a0179..f19846b71b 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -332,10 +332,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression break # pragma: no cover done, futures = wait(futures, return_when=FIRST_COMPLETED) for future in done: - try: - node = future.result() - except Exception: - raise + node = future.result() done_nodes.add(node) # Decrement load counts, and release any datasets we