diff --git a/documentation/index.md b/documentation/index.md index 2a4960c7..48c52bf2 100644 --- a/documentation/index.md +++ b/documentation/index.md @@ -8,7 +8,7 @@ In [Tawazi](https://pypi.org/project/tawazi/), there 3 Classes that will be mani 1. `DAG` / `AsyncDAG`: a wrapper around a function that defines a dag dependency. -This function should only contain calls to objects of type `ExecNode` or to other `DAG`.

+This function should only contain calls to objects of type `ExecNode` or `DAG`.

**Hint:** Calling normal Python functions inside a `DAG` is not supported. @@ -210,7 +210,7 @@ assert pipeline_dict() == {"foo": 2, "bar": 5} ### **Return types for an `ExecNode`** -`ExecNode` supports returning multiple values via: +`ExecNode` supports returning multiple values: 1. For objects of type `Tuple` and `List` in Python, you need to specify the unpacking number @@ -521,7 +521,7 @@ Or using its calling tag to distinguish the 1st call of g from the 2nd call: pipe_exec = pipeline.executor(target_nodes=["byebye"]) pipe_exec() ``` -Or using a reference to itself. You can mixt the Alias types too: +Or using a reference to itself. You can mix the Alias types too: ```python @@ -531,7 +531,7 @@ pipe_exec() ``` !!! warning - Because `DAGExecution` instances are mutable, they are non thread-safe. This is unlike `DAG` which is ThreadSafe. Create a DAGExecution per thread if you want to run the same `DAG` in parallel. + Because `DAGExecution` instances are mutable, they are not thread-safe. This is unlike `DAG` which is ThreadSafe. Create a DAGExecution per thread if you want to run the same `DAG` in parallel. Additionally, you can build a subgraph with the paths you want to include by declaring the root nodes where those paths begin, with the `root_nodes` argument: diff --git a/pyproject.toml b/pyproject.toml index 06870f6b..67c74da1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi" [project] name = "tawazi" -version = "0.4.0.dev0" +version = "0.4.0" description = "This library helps you execute a set of functions in a Directed Acyclic Graph (DAG) dependency structure in parallel in a production environment." authors = [{name = "Mindee", email = "contact@mindee.com"}] maintainers = [ diff --git a/tawazi/__init__.py b/tawazi/__init__.py index 765d047d..c1cfac76 100644 --- a/tawazi/__init__.py +++ b/tawazi/__init__.py @@ -7,7 +7,7 @@ from .config import cfg from .consts import Resource -__version__ = "0.4.0.dev0" +__version__ = "0.4.0" __all__ = [ "AsyncDAG", diff --git a/tawazi/_dag/__init__.py b/tawazi/_dag/__init__.py index 994c373b..fdcb08d2 100644 --- a/tawazi/_dag/__init__.py +++ b/tawazi/_dag/__init__.py @@ -1,4 +1,4 @@ -from .constructor import safe_make_dag +from .constructor import threadsafe_make_dag from .dag import DAG, AsyncDAG, AsyncDAGExecution, DAGExecution -__all__ = ["DAG", "DAGExecution", "AsyncDAG", "AsyncDAGExecution", "safe_make_dag"] +__all__ = ["DAG", "DAGExecution", "AsyncDAG", "AsyncDAGExecution", "threadsafe_make_dag"] diff --git a/tawazi/_dag/constructor.py b/tawazi/_dag/constructor.py index 97d7482e..37f37c85 100644 --- a/tawazi/_dag/constructor.py +++ b/tawazi/_dag/constructor.py @@ -36,9 +36,10 @@ def get_args_and_default_args(func: Callable[..., Any]) -> Tuple[List[str], Dict return args, default_args -def __make_dag( +def make_dag( _func: Callable[P, RVDAG], max_concurrency: int, is_async: bool ) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]: + """Make a DAG or AsyncDAG from the function that describes the DAG.""" # 2. make ExecNodes corresponding to the arguments of the ExecNode # 2.1 get the names of the arguments and the default values func_args, func_default_args = get_args_and_default_args(_func) @@ -87,9 +88,10 @@ def __make_dag( ) -def _make_dag( +def wrap_make_dag( _func: Callable[P, RVDAG], max_concurrency: int, is_async: bool ) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]: + """Clean up before and after making the DAG.""" # 1. node.exec_nodes contains all the ExecNodes that concern the DAG being built at the moment. # make sure it is empty node.exec_nodes = StrictDict() @@ -97,9 +99,10 @@ def _make_dag( node.DAG_PREFIX = [] try: - return __make_dag(_func, max_concurrency, is_async) + return make_dag(_func, max_concurrency, is_async) except NameError as e: - warnings.warn("Are you trying to do recursion?", stacklevel=3) + if _func.__name__ in e.args[0]: + warnings.warn("Recursion is not supported for DAGs", stacklevel=3) raise e # clean up even in case an error is raised during dag construction finally: @@ -110,7 +113,7 @@ def _make_dag( node.DAG_PREFIX = [] -def safe_make_dag( +def threadsafe_make_dag( _func: Union[Callable[P, RVDAG]], max_concurrency: int, is_async: bool ) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]: """Make DAG or AsyncDAG form the function that describes the DAG. @@ -118,4 +121,4 @@ def safe_make_dag( Thread safe and cleans after itself. """ with node.exec_nodes_lock: - return _make_dag(_func, max_concurrency, is_async) + return wrap_make_dag(_func, max_concurrency, is_async) diff --git a/tawazi/_dag/dag.py b/tawazi/_dag/dag.py index 4f622d85..bf8a50ef 100644 --- a/tawazi/_dag/dag.py +++ b/tawazi/_dag/dag.py @@ -1,5 +1,4 @@ """module containing DAG and DAGExecution which are the containers that run ExecNodes in Tawazi.""" -import dataclasses import json import pickle import warnings @@ -59,6 +58,14 @@ def construct_subdag_arg_uxns( return uxns +def detect_duplicates(expanded_config: List[Tuple[Identifier, Any]]) -> None: + duplicates = [ + id for id, count in Counter([id for id, _ in expanded_config]).items() if count > 1 + ] + if duplicates: + raise ValueError(f"trying to set two configs for nodes {duplicates}.") + + @dataclass class BaseDAG(Generic[P, RVDAG]): """Data Structure containing ExecNodes with interdependencies. @@ -334,7 +341,7 @@ def _add_missing_deps(candidate_id: Identifier, xn_ids: Set[Identifier]) -> None ) # change input ExecNodes to ArgExecNodes - new_in_ids = [make_axn_id("composed", old_id) for old_id in in_ids] + new_in_ids = [make_axn_id(qualname, old_id) for old_id in in_ids] for old_id, new_id in zip(in_ids, new_in_ids): logger.debug("changing Composed-DAG's input {} into ArgExecNode", new_id) xn_dict[new_id] = ArgExecNode(new_id) @@ -457,6 +464,15 @@ def _pre_setup( ) return graph + def _expand_config( + self, config_nodes: Dict[Union[Tag, Identifier], Any] + ) -> List[Tuple[Identifier, Any]]: + expanded_config = [] + for alias, conf_node in config_nodes.items(): + ids = self.alias_to_ids(alias) + expanded_config.extend([(id, conf) for id, conf in zip(ids, len(ids) * [conf_node])]) + return expanded_config + def config_from_dict(self, config: Dict[str, Any]) -> None: """Allows reconfiguring the parameters of the nodes from a dictionary. @@ -467,39 +483,12 @@ def config_from_dict(self, config: Dict[str, Any]) -> None: Raises: ValueError: if two nodes are configured by the provided config (which is ambiguous) """ - - def node_values(n: ExecNode, conf: Dict[str, Any]) -> Dict[str, Any]: - values = dataclasses.asdict(n) - values["args"] = n.args - values["kwargs"] = n.kwargs - values["is_sequential"] = conf.get("is_sequential", n.is_sequential) - values["priority"] = conf.get("priority", n.priority) - return values # ignore: typing[no-any-return] - - def expand_config( - config_nodes: Dict[Union[Tag, Identifier], Any] - ) -> List[Tuple[Identifier, Any]]: - expanded_config = [] - for alias, conf_node in config_nodes.items(): - ids = self.alias_to_ids(alias) - expanded_config.extend( - [(id, conf) for id, conf in zip(ids, len(ids) * [conf_node])] - ) - return expanded_config - - def detect_duplicates(expanded_config: List[Tuple[Identifier, Any]]) -> None: - duplicates = [ - id for id, count in Counter([id for id, _ in expanded_config]).items() if count > 1 - ] - if duplicates: - raise ValueError(f"trying to set two configs for nodes {duplicates}.") - if "nodes" in config: - expanded_config = expand_config(config["nodes"]) + expanded_config = self._expand_config(config["nodes"]) detect_duplicates(expanded_config) for node_id, conf_node in expanded_config: node = self.get_node_by_id(node_id) - values = node_values(node, conf_node) + values = node._conf_to_values(conf_node) self.exec_nodes.force_set(node_id, node.__class__(**values)) if "max_concurrency" in config: @@ -720,7 +709,7 @@ def to_subdag_id(id_: str) -> str: # input ExecNode was already registered during step for zip if new_id in registered_input_ids: logger.debug( - "Skipping ExecNode {} because it is an already registered input", new_id + "Skipping ExecNode {} because the input is already registered", new_id ) continue @@ -880,8 +869,12 @@ async def run_subgraph( for node_id, result in results.items(): xn = self.exec_nodes[node_id] if xn.setup and not xn.executed(self.results): - logger.debug("Setting result of setup ExecNode {} to {}", node_id, result) - logger.debug("Future executions will use this result.") + logger.debug( + "Setting result of setup ExecNode {} to {}" + "Future executions will use this result.", + node_id, + result, + ) self.results[node_id] = result return exec_nodes, results, profiles diff --git a/tawazi/_dag/helpers.py b/tawazi/_dag/helpers.py index ab9b79f0..0a0073b9 100644 --- a/tawazi/_dag/helpers.py +++ b/tawazi/_dag/helpers.py @@ -197,7 +197,7 @@ async def to_thread_in_executor( allowing context variables from the main thread to be accessed in the separate thread. - Return a coroutine that can be awaited to get the eventual result of *func*. + Return a coroutine that can be awaited to get the result of *func*. """ loop = asyncio.get_running_loop() ctx = contextvars.copy_context() diff --git a/tawazi/_decorators.py b/tawazi/_decorators.py index 5ea2e10c..bae2cc8a 100644 --- a/tawazi/_decorators.py +++ b/tawazi/_decorators.py @@ -8,7 +8,7 @@ from typing_extensions import Literal from tawazi import AsyncDAG -from tawazi._dag import DAG, safe_make_dag +from tawazi._dag import DAG, threadsafe_make_dag from .config import cfg from .consts import RVDAG, RVXN, P, Resource, TagOrTags @@ -213,7 +213,7 @@ def dag( # wrapper used to support parametrized and non parametrized decorators def intermediate_wrapper(_func: Callable[P, RVDAG]) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]: # 0. Protect against multiple threads declaring many DAGs at the same time - d = safe_make_dag(_func, max_concurrency, is_async) + d = threadsafe_make_dag(_func, max_concurrency, is_async) functools.update_wrapper(d, _func) return d diff --git a/tawazi/node/node.py b/tawazi/node/node.py index bce8a16e..f675b3c3 100644 --- a/tawazi/node/node.py +++ b/tawazi/node/node.py @@ -265,6 +265,14 @@ def get_call_location(self) -> str: frame_info = inspect.getframeinfo(frame) return f"{frame_info.filename}:{frame_info.lineno}" + def _conf_to_values(self, conf: Dict[str, Any]) -> Dict[str, Any]: + values = dataclasses.asdict(self) + values["args"] = self.args + values["kwargs"] = self.kwargs + values["is_sequential"] = conf.get("is_sequential", self.is_sequential) + values["priority"] = conf.get("priority", self.priority) + return values # ignore: typing[no-any-return] + class ReturnExecNode(ExecNode): """ExecNode corresponding to a constant Return value of a DAG.""" diff --git a/tests/test_build.py b/tests/test_build.py index 9e41afe1..ca2e8656 100644 --- a/tests/test_build.py +++ b/tests/test_build.py @@ -97,7 +97,7 @@ def test_setting_execnode_id_should_fail() -> None: def test_execnodes() -> None: - with pytest.raises(NameError), pytest.warns(UserWarning, match="recursion"): + with pytest.raises(NameError): @dag def pipe() -> None: diff --git a/tests/test_dag_in_dag.py b/tests/test_dag_in_dag.py index a3f42fb4..c495017c 100644 --- a/tests/test_dag_in_dag.py +++ b/tests/test_dag_in_dag.py @@ -193,7 +193,7 @@ def my_dag(v: int, activate: bool, activate_subdag: bool) -> int: def test_recursive() -> None: with pytest.raises(NameError): - with pytest.warns(UserWarning, match="Are you trying to do recursion?"): + with pytest.warns(UserWarning, match="Recursion is not supported for DAGs"): @dag def rec() -> None: