Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pluggable DependencyResolvers #3111

Merged
merged 26 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fd9f668
Add deliberately breaking test for deep-future-resolution
benclifford Feb 29, 2024
020a37f
Replace explicit future check with traverse calls
benclifford Feb 29, 2024
1889bf9
Add example for tuples
benclifford Feb 29, 2024
3241189
fiddling round unlocks a new codepath that is not tested, and breaks,…
benclifford Feb 29, 2024
9fabe49
Merge branch 'master' into benc-plugin-future-resolution
benclifford Feb 29, 2024
e382e39
Adding onto #3111: shallow resolution of lists, sets, and tuples (#3118)
Andrew-S-Rosen Mar 4, 2024
c0827be
Add support for dicts in future-resolution-plugin (#3111) (#3289)
Andrew-S-Rosen Mar 25, 2024
99ec556
Use `dict` instead of `Mapping` in #3111 (#3291)
Andrew-S-Rosen Mar 26, 2024
01f51b2
Merge remote-tracking branch 'origin/master' into benc-plugin-future-…
benclifford Apr 29, 2024
d979b08
Allow dependency resolver to be configured per-DFK
benclifford Apr 29, 2024
fc45588
Docs
benclifford Apr 29, 2024
7a64b9c
Docs
benclifford Apr 29, 2024
b7fbd43
Merge remote-tracking branch 'origin/master' into benc-plugin-future-…
benclifford Apr 29, 2024
48bfb58
Add a (broken/not implemented) test of very deep traversal
benclifford Apr 29, 2024
282f609
Implement deep list traversal
benclifford Apr 29, 2024
77caf05
test tuples and lists deep resolver
benclifford Apr 29, 2024
93e4b9a
traverse dicts
benclifford Apr 29, 2024
7ac5b72
Remove some TODOs
benclifford Apr 29, 2024
c1d7206
Merge branch 'master' into benc-plugin-future-resolution
benclifford May 17, 2024
3f54c17
Remove now-misplaced comment
benclifford May 21, 2024
0234d45
Fix typo in debug log
benclifford May 21, 2024
f781148
Merge remote-tracking branch 'origin/master' into benc-plugin-future-…
benclifford May 21, 2024
ffc6b4f
Remove type expansion that was addressed differently in #3445
benclifford May 21, 2024
59628e0
Fix typo in docstring
benclifford May 21, 2024
b703b91
Merge remote-tracking branch 'origin/master' into benc-plugin-future-…
benclifford May 23, 2024
961bb6d
pronoun typo fix in docstring
benclifford May 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Core
parsl.dataflow.futures.AppFuture
parsl.dataflow.dflow.DataFlowKernelLoader
parsl.monitoring.MonitoringHub
parsl.dataflow.dependency_resolvers.DependencyResolver
parsl.dataflow.dependency_resolvers.DEEP_DEPENDENCY_RESOLVER
parsl.dataflow.dependency_resolvers.SHALLOW_DEPENDENCY_RESOLVER

Configuration
=============
Expand Down
16 changes: 16 additions & 0 deletions docs/userguide/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ from invoking a Parsl app. This includes as the return value of a

An specific example of this is integrating Globus Compute tasks into a Parsl
task graph. See :ref:`label-join-globus-compute`

Dependency resolution
---------------------

When Parsl examines the arguments to an app, it uses a `DependencyResolver`.
The default `DependencyResolver` will cause Parsl to wait for
``concurrent.futures.Future`` instances (including `AppFuture` and
`DataFuture`), and pass through other arguments without waiting.

This behaviour is pluggable: Parsl comes with another dependency resolver,
`DEEP_DEPENDENCY_RESOLVER` which knows about futures contained with structures
such as tuples, lists, sets and dicts.

This plugin interface might be used to interface other task-like or future-like
objects to the Parsl dependency mechanism, by describing how they can be
interpreted as a Future.
Comment on lines +93 to +104
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This text is accurate, and points to the right place, but as a "documentation consumer," I find myself without a proper mental model for what this looks like. Would an example implementation be an undue burden to place here? Or at the end of one of the links?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on a presentation for this for next Tuesday so I'll try to use the preparation for that as a way to get my head around more introductory material.

8 changes: 7 additions & 1 deletion parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing_extensions import Literal

from parsl.utils import RepresentationMixin
from parsl.dataflow.dependency_resolvers import DependencyResolver
from parsl.executors.base import ParslExecutor
from parsl.executors.threads import ThreadPoolExecutor
from parsl.errors import ConfigurationError
Expand Down Expand Up @@ -35,6 +36,8 @@ class Config(RepresentationMixin, UsageInformation):
checkpoint_period : str, optional
Time interval (in "HH:MM:SS") at which to checkpoint completed tasks. Only has an effect if
``checkpoint_mode='periodic'``.
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
using the `SHALLOW_DEPENDENCY_RESOLVER`.
garbage_collect : bool. optional.
Delete task records from DFK when tasks have completed. Default: True
internal_tasks_max_threads : int, optional
Expand Down Expand Up @@ -88,6 +91,7 @@ def __init__(self,
Literal['dfk_exit'],
Literal['manual']] = None,
checkpoint_period: Optional[str] = None,
dependency_resolver: Optional[DependencyResolver] = None,
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
Expand Down Expand Up @@ -123,6 +127,7 @@ def __init__(self,
if checkpoint_mode == 'periodic' and checkpoint_period is None:
checkpoint_period = "00:30:00"
self.checkpoint_period = checkpoint_period
self.dependency_resolver = dependency_resolver
self.garbage_collect = garbage_collect
self.internal_tasks_max_threads = internal_tasks_max_threads
self.retries = retries
Expand Down Expand Up @@ -152,4 +157,5 @@ def _validate_executors(self) -> None:
', '.join(['label={}'.format(repr(d)) for d in duplicates])))

def get_usage_information(self):
return {"executors_len": len(self.executors)}
return {"executors_len": len(self.executors),
"dependency_resolver": self.dependency_resolver is not None}
115 changes: 115 additions & 0 deletions parsl/dataflow/dependency_resolvers.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests in this PR go through the whole Parsl machinery -- good! But an ancillary set of unit tests that verify strictly this class in isolation would be a good value-add. That is, this is a decently isolated class that doesn't depend on Parsl, so it's functionality could be verified independently of the rest of the infrastructure.

(Note that we've only recently added the tests/unit/ directory, so this would be a good second addition to that.)

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from concurrent.futures import Future
from dataclasses import dataclass
from functools import singledispatch
from typing import Callable


@dataclass
class DependencyResolver:
"""A DependencyResolver describes how app dependencies can be resolved.
It is specified as two functions: `traverse_to_gather` which turns an
app parameter into a list of futures which must be waited for before
the task can be executed (for example, in the case of
`DEEP_DEPENDENCY_RESOLVER` this traverses structures such as lists to
find every contained ``Future``), and `traverse_to_unwrap` which turns an
app parameter into it's value to be passed to the app on execution
(for example in the case of `DEEP_DEPENDENCY_RESOLVER` this replaces a
benclifford marked this conversation as resolved.
Show resolved Hide resolved
list containing futures with a new list containing the values of those
resolved futures).

By default, Parsl will use `SHALLOW_DEPENDENCY_RESOLVER` which only
resolves Futures passed directly as arguments.
"""
traverse_to_gather: Callable
traverse_to_unwrap: Callable


@singledispatch
def shallow_traverse_to_gather(o):
# objects in general do not expose futures that we can see
return []


@singledispatch
def shallow_traverse_to_unwrap(o):
# objects in general unwrap to themselves
return o


@shallow_traverse_to_gather.register
def _(fut: Future):
return [fut]


@shallow_traverse_to_unwrap.register
@singledispatch
def _(fut: Future):
return fut.result()


@singledispatch
def deep_traverse_to_gather(o):
# objects in general do not expose futures that we can see
return []


@singledispatch
def deep_traverse_to_unwrap(o):
# objects in general unwrap to themselves
return o


@deep_traverse_to_gather.register
def _(fut: Future):
return [fut]


@deep_traverse_to_unwrap.register
@singledispatch
def _(fut: Future):
return fut.result()


@deep_traverse_to_gather.register(tuple)
@deep_traverse_to_gather.register(list)
@deep_traverse_to_gather.register(set)
def _(iterable):
return [e for v in iterable for e in deep_traverse_to_gather(v) if isinstance(e, Future)]


@deep_traverse_to_unwrap.register(tuple)
@deep_traverse_to_unwrap.register(list)
@deep_traverse_to_unwrap.register(set)
@singledispatch
def _(iterable):

type_ = type(iterable)
return type_(map(deep_traverse_to_unwrap, iterable))


@deep_traverse_to_gather.register(dict)
def _(dictionary):
futures = []
for key, value in dictionary.items():
if isinstance(key, Future):
futures.append(key)
if isinstance(value, Future):
futures.append(value)
return futures


@deep_traverse_to_unwrap.register(dict)
def _(dictionary):
unwrapped_dict = {}
for key, value in dictionary.items():
key = deep_traverse_to_unwrap(key)
value = deep_traverse_to_unwrap(value)
unwrapped_dict[key] = value
return unwrapped_dict


DEEP_DEPENDENCY_RESOLVER = DependencyResolver(traverse_to_gather=deep_traverse_to_gather,
traverse_to_unwrap=deep_traverse_to_unwrap)

SHALLOW_DEPENDENCY_RESOLVER = DependencyResolver(traverse_to_gather=shallow_traverse_to_gather,
traverse_to_unwrap=shallow_traverse_to_unwrap)
46 changes: 25 additions & 21 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from parsl.config import Config
from parsl.data_provider.data_manager import DataManager
from parsl.data_provider.files import File
from parsl.dataflow.dependency_resolvers import SHALLOW_DEPENDENCY_RESOLVER
from parsl.dataflow.errors import BadCheckpoint, DependencyError, JoinError
from parsl.dataflow.futures import AppFuture
from parsl.dataflow.memoization import Memoizer
Expand Down Expand Up @@ -203,6 +204,9 @@ def __init__(self, config: Config) -> None:
self.tasks: Dict[int, TaskRecord] = {}
self.submitter_lock = threading.Lock()

self.dependency_resolver = self.config.dependency_resolver if self.config.dependency_resolver is not None \
else SHALLOW_DEPENDENCY_RESOLVER

Comment on lines +207 to +209
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implies that self.dependency_resolver is a required attribute. Is there utility in making it required at the configuration as well, rather than an implied requirement? That is, moving this conditional into config, and instead either trusting the config object, or asserting here? Perhaps something like:

class Config(...):
    def __init__(
        ...
        dependency_resolver: Optional[DependencyResolver],
        ...
    ):
        if dependency_resolver is None:
            dependency_resolver = SHALLOW_DEPENDENCY_RESOLVER
        self.dependency_resolver = dependency_resolver

Mypy may complain with that particular construction and not-Noneness (so fiddle!), but the point is that the config object is explicit as to what dependency resolver is in use.

Functionally a wash, I think (so I won't be fussed about this), but thinking in terms of overall clarity for when someone is poking at the REPL or CLI.

atexit.register(self.atexit_cleanup)

def __enter__(self):
Expand Down Expand Up @@ -852,8 +856,11 @@ def _gather_all_deps(self, args: Sequence[Any], kwargs: Dict[str, Any]) -> List[
depends: List[Future] = []

def check_dep(d: Any) -> None:
if isinstance(d, Future):
depends.extend([d])
try:
depends.extend(self.dependency_resolver.traverse_to_gather(d))
except Exception:
logger.exception("Exception in dependency_resolver.traverse_to_gather")
raise

# Check the positional args
for dep in args:
Expand Down Expand Up @@ -905,34 +912,27 @@ def append_failure(e: Exception, dep: Future) -> None:
# Replace item in args
new_args = []
for dep in args:
if isinstance(dep, Future):
try:
new_args.extend([dep.result()])
except Exception as e:
append_failure(e, dep)
else:
new_args.extend([dep])
try:
new_args.extend([self.dependency_resolver.traverse_to_unwrap(dep)])
except Exception as e:
append_failure(e, dep)

# Check for explicit kwargs ex, fu_1=<fut>
for key in kwargs:
dep = kwargs[key]
if isinstance(dep, Future):
try:
kwargs[key] = dep.result()
except Exception as e:
append_failure(e, dep)
try:
kwargs[key] = self.dependency_resolver.traverse_to_unwrap(dep)
except Exception as e:
append_failure(e, dep)

# Check for futures in inputs=[<fut>...]
if 'inputs' in kwargs:
new_inputs = []
for dep in kwargs['inputs']:
if isinstance(dep, Future):
try:
new_inputs.extend([dep.result()])
except Exception as e:
append_failure(e, dep)
else:
new_inputs.extend([dep])
try:
new_inputs.extend([self.dependency_resolver.traverse_to_unwrap(dep)])
except Exception as e:
append_failure(e, dep)
kwargs['inputs'] = new_inputs

return new_args, kwargs, dep_failures
Expand Down Expand Up @@ -1037,6 +1037,8 @@ def submit(self,

func = self._add_output_deps(executor, app_args, app_kwargs, app_fu, func)

logger.debug("Added output dependencies")

# Replace the function invocation in the TaskRecord with whatever file-staging
# substitutions have been made.
task_record.update({
Expand All @@ -1048,8 +1050,10 @@ def submit(self,

self.tasks[task_id] = task_record

logger.debug("Gathering dependencies")
# Get the list of dependencies for the task
depends = self._gather_all_deps(app_args, app_kwargs)
logger.debug("Gathered dependencies")
task_record['depends'] = depends

depend_descs = []
Expand Down
Loading
Loading