From 2d4ae854cea8fecff8c62b070d9dcde2b033ddf8 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 21 May 2024 18:01:05 +0200 Subject: [PATCH] Extend dependency error change in #2989 to remaining two dependency types (#3445) PR #2989 made dependency errors look nicer, but only did it for positional parameters. This PR extends that to kwarg and inputs=... dependencies. PR #2989 describes these changes in more depth. This PR adds a type annotation onto DFK._unwrap_futures that drives this change - preventing _unwrap_futures from returning None task ids for failed dependencies and instead requiring the task IDs to be strings. --- parsl/dataflow/dflow.py | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 9bc78b1ce3..5027e910e0 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -870,7 +870,8 @@ def check_dep(d: Any) -> None: return depends - def _unwrap_futures(self, args, kwargs): + def _unwrap_futures(self, args: Sequence[Any], kwargs: Dict[str, Any]) \ + -> Tuple[Sequence[Any], Dict[str, Any], Sequence[Tuple[Exception, str]]]: """This function should be called when all dependencies have completed. It will rewrite the arguments for that task, replacing each Future @@ -891,6 +892,16 @@ def _unwrap_futures(self, args, kwargs): """ dep_failures = [] + def append_failure(e: Exception, dep: Future) -> None: + # If this Future is associated with a task inside this DFK, + # then refer to the task ID. + # Otherwise make a repr of the Future object. + if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self: + tid = "task " + repr(dep.task_record['id']) + else: + tid = repr(dep) + dep_failures.extend([(e, tid)]) + # Replace item in args new_args = [] for dep in args: @@ -898,14 +909,7 @@ def _unwrap_futures(self, args, kwargs): try: new_args.extend([dep.result()]) except Exception as e: - # If this Future is associated with a task inside this DFK, - # then refer to the task ID. - # Otherwise make a repr of the Future object. - if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self: - tid = "task " + repr(dep.task_record['id']) - else: - tid = repr(dep) - dep_failures.extend([(e, tid)]) + append_failure(e, dep) else: new_args.extend([dep]) @@ -916,11 +920,7 @@ def _unwrap_futures(self, args, kwargs): try: kwargs[key] = dep.result() except Exception as e: - if hasattr(dep, 'task_record'): - tid = dep.task_record['id'] - else: - tid = None - dep_failures.extend([(e, tid)]) + append_failure(e, dep) # Check for futures in inputs=[...] if 'inputs' in kwargs: @@ -930,12 +930,7 @@ def _unwrap_futures(self, args, kwargs): try: new_inputs.extend([dep.result()]) except Exception as e: - if hasattr(dep, 'task_record'): - tid = dep.task_record['id'] - else: - tid = None - dep_failures.extend([(e, tid)]) - + append_failure(e, dep) else: new_inputs.extend([dep]) kwargs['inputs'] = new_inputs