Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve resume pipeline suggestion for SequentialRunner #1795

Merged
merged 38 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
426824a
Add _find_first_persistent_ancestors and stubs for supporting functions.
jmholzer Aug 10, 2022
f90daf7
Add body to _enumerate_parents.
jmholzer Aug 10, 2022
e1cb2e3
Add function to check persistence of node outputs.
jmholzer Aug 11, 2022
18a6105
Modify _suggest_resume_scenario to use _find_first_persistent_ancestors
jmholzer Aug 11, 2022
7753486
Pass catalog to self._suggest_resume_scenario
jmholzer Aug 11, 2022
a402aa7
Track and return all ancestor nodes that must be re-run during DFS.
jmholzer Aug 15, 2022
699a9f5
Integrate DFS with original _suggest_resume_scenario.
jmholzer Aug 15, 2022
a49a6f7
Implement backwards-DFS strategy on all boundary nodes.
jmholzer Aug 16, 2022
7955d0d
Switch to multi-node start BFS approach to finding persistent ancestors.
jmholzer Aug 17, 2022
68764f7
Add a useful error message if no nodes ran.
jmholzer Aug 18, 2022
74c60f7
Add docstrings to new functions.
jmholzer Aug 18, 2022
958fb91
Add catalog argument to self._suggest_resume_scenario
jmholzer Aug 18, 2022
d61a19b
Modify exception_fn to allow it to take multiple arguments
jmholzer Aug 18, 2022
8724923
Add test for AbstractRunner._suggest_resume_scenario
jmholzer Aug 18, 2022
f57c431
Add docstring for _suggest_resume_scenario
jmholzer Aug 18, 2022
9fda4c0
Improve formatting
jmholzer Aug 18, 2022
3a79059
Move new functions out of AbstractRunner
jmholzer Aug 18, 2022
13063dd
Remove bare except
jmholzer Aug 18, 2022
f29bbf5
Fix broad except clause
jmholzer Aug 18, 2022
01d5ab0
Access datasets __dict__ using vars()
jmholzer Aug 18, 2022
1dae5e7
Sort imports
jmholzer Aug 18, 2022
d572896
Improve resume message
jmholzer Aug 18, 2022
af405ed
Add a space to resume suggestion message
jmholzer Aug 19, 2022
d1b6693
Merge branch 'main' into feat/improve-resume-scenario-suggestion
jmholzer Aug 19, 2022
836d583
Modify DFS logic to eliminate possible queue duplicates
jmholzer Aug 26, 2022
7c7e795
Modify catalog.datasets to catalog._data_sets w/ disabled linter warning
jmholzer Aug 26, 2022
069a6fe
Move all pytest fixtures to conftest.py
jmholzer Aug 26, 2022
e4a6525
Modify all instances of Pipeline to pipeline
jmholzer Aug 26, 2022
32aa7a2
Fix typo in the name of TestSequentialRunnerBranchedPipeline
jmholzer Aug 26, 2022
e92a756
Remove spurious assert in save of persistent_dataset_catalog
jmholzer Aug 26, 2022
ffb4074
Replace instantiations of Pipeline with pipeline
jmholzer Aug 31, 2022
1bf5f1d
Modify test_suggest_resume_scenario fixture to use node names
jmholzer Aug 31, 2022
c2481db
Add disable=unused-argument to _save
jmholzer Aug 31, 2022
35ad81f
Merge branch 'main' into feat/improve-resume-scenario-suggestion
jmholzer Aug 31, 2022
99e0bb2
Remove resume suggestion for ParallelRunner
jmholzer Sep 2, 2022
85f7609
Merge branch 'main' into feat/improve-resume-scenario-suggestion
jmholzer Sep 2, 2022
a74fef6
Remove spurious try / except
jmholzer Sep 2, 2022
74d0cec
Merge branch 'feat/improve-resume-scenario-suggestion' of github.com:…
jmholzer Sep 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions kedro/runner/parallel_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression
break # pragma: no cover
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
try:
node = future.result()
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
raise
node = future.result()
done_nodes.add(node)

# Decrement load counts, and release any datasets we
Expand Down
119 changes: 106 additions & 13 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

import logging
from abc import ABC, abstractmethod
from collections import deque
from concurrent.futures import (
ALL_COMPLETED,
Future,
ThreadPoolExecutor,
as_completed,
wait,
)
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, List, Set

from pluggy import PluginManager

from kedro.framework.hooks.manager import _NullPluginManager
from kedro.io import AbstractDataSet, DataCatalog
from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.pipeline.node import Node

Expand Down Expand Up @@ -157,31 +158,123 @@ def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
Returns:
An instance of an implementation of ``AbstractDataSet`` to be
used for all unregistered datasets.

"""
pass

def _suggest_resume_scenario(
self, pipeline: Pipeline, done_nodes: Iterable[Node]
self,
pipeline: Pipeline,
done_nodes: Iterable[Node],
catalog: DataCatalog,
) -> None:
"""
Suggest a command to the user to resume a run after it fails.
The run should be started from the point closest to the failure
for which persisted input exists.

Args:
pipeline: the ``Pipeline`` of the run.
done_nodes: the ``Node``s that executed successfully.
catalog: the ``DataCatalog`` of the run.

"""
remaining_nodes = set(pipeline.nodes) - set(done_nodes)

postfix = ""
if done_nodes:
node_names = (n.name for n in remaining_nodes)
resume_p = pipeline.only_nodes(*node_names)

start_p = resume_p.only_nodes_with_inputs(*resume_p.inputs())
start_node_names = (n.name for n in start_p.nodes)

# find the nearest persistent ancestors of the nodes in start_p
start_p_persistent_ancestors = _find_persistent_ancestors(
pipeline, start_p.nodes, catalog
)

start_node_names = (n.name for n in start_p_persistent_ancestors)
postfix += f" --from-nodes \"{','.join(start_node_names)}\""

self._logger.warning(
"There are %d nodes that have not run.\n"
"You can resume the pipeline run by adding the following "
"argument to your previous command:\n%s",
len(remaining_nodes),
postfix,
)
if not postfix:
self._logger.warning(
"No nodes ran. Repeat the previous command to attempt a new run."
merelcht marked this conversation as resolved.
Show resolved Hide resolved
)
else:
self._logger.warning(
"There are %d nodes that have not run.\n"
"You can resume the pipeline run from the nearest nodes with "
"persisted inputs by adding the following "
"argument to your previous command:\n%s",
len(remaining_nodes),
postfix,
)


def _find_persistent_ancestors(
pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog
) -> Set[Node]:
"""Breadth-first search approach to finding the complete set of
persistent ancestors of an iterable of ``Node``s. Persistent
ancestors exclusively have persisted ``Dataset``s as inputs.

Args:
pipeline: the ``Pipeline`` to find ancestors in.
children: the iterable containing ``Node``s to find ancestors of.
catalog: the ``DataCatalog`` of the run.

Returns:
A set containing first persistent ancestors of the given
``Node``s.

"""
ancestor_nodes_to_run = set()
queue, visited = deque(children), set(children)
while queue:
current_node = queue.popleft()
if _has_persistent_inputs(current_node, catalog):
ancestor_nodes_to_run.add(current_node)
continue
for parent in _enumerate_parents(pipeline, current_node):
if parent in visited:
continue
visited.add(parent)
queue.append(parent)
return ancestor_nodes_to_run


def _enumerate_parents(pipeline: Pipeline, child: Node) -> List[Node]:
"""For a given ``Node``, returns a list containing the direct parents
of that ``Node`` in the given ``Pipeline``.

Args:
pipeline: the ``Pipeline`` to search for direct parents in.
child: the ``Node`` to find parents of.

Returns:
A list of all ``Node``s that are direct parents of ``child``.

"""
parent_pipeline = pipeline.only_nodes_with_outputs(*child.inputs)
return parent_pipeline.nodes


def _has_persistent_inputs(node: Node, catalog: DataCatalog) -> bool:
"""Check if a ``Node`` exclusively has persisted Datasets as inputs.
If at least one input is a ``MemoryDataSet``, return False.

Args:
node: the ``Node`` to check the inputs of.
catalog: the ``DataCatalog`` of the run.

Returns:
True if the ``Node`` being checked exclusively has inputs that
are not ``MemoryDataSet``, else False.

"""
for node_input in node.inputs:
# pylint: disable=protected-access
if isinstance(catalog._data_sets[node_input], MemoryDataSet):
return False
return True


def run_node(
Expand Down
2 changes: 1 addition & 1 deletion kedro/runner/sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _run(
run_node(node, catalog, hook_manager, self._is_async, session_id)
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise

# decrement load counts and release any data sets we've finished with
Expand Down
2 changes: 1 addition & 1 deletion kedro/runner/thread_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression
try:
node = future.result()
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
self._suggest_resume_scenario(pipeline, done_nodes, catalog)
raise
done_nodes.add(node)
self._logger.info("Completed node: %s", node.name)
Expand Down
95 changes: 87 additions & 8 deletions tests/runner/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from random import random

import pandas as pd
import pytest

from kedro.io import DataCatalog
from kedro.pipeline import Pipeline, node
from kedro.io import DataCatalog, LambdaDataSet, MemoryDataSet
from kedro.pipeline import node, pipeline


def source():
Expand All @@ -22,7 +23,7 @@ def fan_in(*args):
return args


def exception_fn(arg):
def exception_fn(*args):
raise Exception("test exception")


Expand All @@ -35,14 +36,58 @@ def return_not_serialisable(arg): # pylint: disable=unused-argument
return lambda x: x


def multi_input_list_output(arg1, arg2):
return [arg1, arg2]


@pytest.fixture
def conflicting_feed_dict(pandas_df_feed_dict):
ds1 = MemoryDataSet({"data": 0})
ds3 = pandas_df_feed_dict["ds3"]
return {"ds1": ds1, "ds3": ds3}


@pytest.fixture
def pandas_df_feed_dict():
pandas_df = pd.DataFrame({"Name": ["Alex", "Bob"], "Age": [15, 25]})
return {"ds3": pandas_df}


@pytest.fixture
def catalog():
return DataCatalog()


@pytest.fixture
def memory_catalog():
ds1 = MemoryDataSet({"data": 42})
ds2 = MemoryDataSet([1, 2, 3, 4, 5])
return DataCatalog({"ds1": ds1, "ds2": ds2})


@pytest.fixture
def persistent_dataset_catalog():
def _load():
return 0

# pylint: disable=unused-argument
def _save(arg):
pass

persistent_dataset = LambdaDataSet(load=_load, save=_save)
return DataCatalog(
{
"ds0_A": persistent_dataset,
"ds0_B": persistent_dataset,
"ds2_A": persistent_dataset,
"ds2_B": persistent_dataset,
}
)


@pytest.fixture
def fan_out_fan_in():
return Pipeline(
return pipeline(
[
node(identity, "A", "B"),
node(identity, "B", "C"),
Expand All @@ -56,7 +101,7 @@ def fan_out_fan_in():
@pytest.fixture
def branchless_no_input_pipeline():
"""The pipeline runs in the order A->B->C->D->E."""
return Pipeline(
return pipeline(
[
node(identity, "D", "E", name="node1"),
node(identity, "C", "D", name="node2"),
Expand All @@ -69,7 +114,7 @@ def branchless_no_input_pipeline():

@pytest.fixture
def branchless_pipeline():
return Pipeline(
return pipeline(
[
node(identity, "ds1", "ds2", name="node1"),
node(identity, "ds2", "ds3", name="node2"),
Expand All @@ -79,11 +124,45 @@ def branchless_pipeline():

@pytest.fixture
def saving_result_pipeline():
return Pipeline([node(identity, "ds", "dsX")])
return pipeline([node(identity, "ds", "dsX")])


@pytest.fixture
def saving_none_pipeline():
return Pipeline(
return pipeline(
[node(random, None, "A"), node(return_none, "A", "B"), node(identity, "B", "C")]
)


@pytest.fixture
def unfinished_outputs_pipeline():
return pipeline(
[
node(identity, dict(arg="ds4"), "ds8", name="node1"),
node(sink, "ds7", None, name="node2"),
node(multi_input_list_output, ["ds3", "ds4"], ["ds6", "ds7"], name="node3"),
node(identity, "ds2", "ds5", name="node4"),
node(identity, "ds1", "ds4", name="node5"),
]
) # Outputs: ['ds8', 'ds5', 'ds6'] == ['ds1', 'ds2', 'ds3']


@pytest.fixture
def two_branches_crossed_pipeline():
"""A ``Pipeline`` with an X-shape (two branches with one common node)"""
return pipeline(
[
node(identity, "ds0_A", "ds1_A", name="node1_A"),
node(identity, "ds0_B", "ds1_B", name="node1_B"),
node(
multi_input_list_output,
["ds1_A", "ds1_B"],
["ds2_A", "ds2_B"],
name="node2",
),
node(identity, "ds2_A", "ds3_A", name="node3_A"),
node(identity, "ds2_B", "ds3_B", name="node3_B"),
node(identity, "ds3_A", "ds4_A", name="node4_A"),
node(identity, "ds3_B", "ds4_B", name="node4_B"),
]
)
Loading