Skip to content

Commit

Permalink
Extend dependency error change in #2989 to remaining two dependency t…
Browse files Browse the repository at this point in the history
…ypes (#3445)

PR #2989 made dependency errors look nicer, but only did it for positional
parameters. This PR extends that to kwarg and inputs=... dependencies.

PR #2989 describes these changes in more depth.

This PR adds a type annotation onto DFK._unwrap_futures that drives this
change - preventing _unwrap_futures from returning None task ids for failed
dependencies and instead requiring the task IDs to be strings.
  • Loading branch information
benclifford authored May 21, 2024
1 parent f210753 commit 2d4ae85
Showing 1 changed file with 15 additions and 20 deletions.
35 changes: 15 additions & 20 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,8 @@ def check_dep(d: Any) -> None:

return depends

def _unwrap_futures(self, args, kwargs):
def _unwrap_futures(self, args: Sequence[Any], kwargs: Dict[str, Any]) \
-> Tuple[Sequence[Any], Dict[str, Any], Sequence[Tuple[Exception, str]]]:
"""This function should be called when all dependencies have completed.
It will rewrite the arguments for that task, replacing each Future
Expand All @@ -891,21 +892,24 @@ def _unwrap_futures(self, args, kwargs):
"""
dep_failures = []

def append_failure(e: Exception, dep: Future) -> None:
# If this Future is associated with a task inside this DFK,
# then refer to the task ID.
# Otherwise make a repr of the Future object.
if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self:
tid = "task " + repr(dep.task_record['id'])
else:
tid = repr(dep)
dep_failures.extend([(e, tid)])

# Replace item in args
new_args = []
for dep in args:
if isinstance(dep, Future):
try:
new_args.extend([dep.result()])
except Exception as e:
# If this Future is associated with a task inside this DFK,
# then refer to the task ID.
# Otherwise make a repr of the Future object.
if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self:
tid = "task " + repr(dep.task_record['id'])
else:
tid = repr(dep)
dep_failures.extend([(e, tid)])
append_failure(e, dep)
else:
new_args.extend([dep])

Expand All @@ -916,11 +920,7 @@ def _unwrap_futures(self, args, kwargs):
try:
kwargs[key] = dep.result()
except Exception as e:
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])
append_failure(e, dep)

# Check for futures in inputs=[<fut>...]
if 'inputs' in kwargs:
Expand All @@ -930,12 +930,7 @@ def _unwrap_futures(self, args, kwargs):
try:
new_inputs.extend([dep.result()])
except Exception as e:
if hasattr(dep, 'task_record'):
tid = dep.task_record['id']
else:
tid = None
dep_failures.extend([(e, tid)])

append_failure(e, dep)
else:
new_inputs.extend([dep])
kwargs['inputs'] = new_inputs
Expand Down

0 comments on commit 2d4ae85

Please sign in to comment.