Skip to content

Commit

Permalink
chg: 📝 resolving conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
bashirmindee committed Aug 7, 2024
1 parent 02ac36c commit d317f95
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 53 deletions.
8 changes: 4 additions & 4 deletions documentation/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ In [Tawazi](https://pypi.org/project/tawazi/), there 3 Classes that will be mani


1. `DAG` / `AsyncDAG`: a wrapper around a function that defines a dag dependency.
This function should only contain calls to objects of type `ExecNode` or to other `DAG`.<p></p>
This function should only contain calls to objects of type `ExecNode` or `DAG`.<p></p>
**Hint:** Calling normal Python functions inside a `DAG` is not supported.


Expand Down Expand Up @@ -210,7 +210,7 @@ assert pipeline_dict() == {"foo": 2, "bar": 5}

### **Return types for an `ExecNode`**

`ExecNode` supports returning multiple values via:
`ExecNode` supports returning multiple values:

1. For objects of type `Tuple` and `List` in Python, you need to specify the unpacking number
<!--pytest-codeblocks:cont-->
Expand Down Expand Up @@ -521,7 +521,7 @@ Or using its calling tag to distinguish the 1st call of g from the 2nd call:
pipe_exec = pipeline.executor(target_nodes=["byebye"])
pipe_exec()
```
Or using a reference to itself. You can mixt the Alias types too:
Or using a reference to itself. You can mix the Alias types too:
<!--pytest-codeblocks:cont-->

```python
Expand All @@ -531,7 +531,7 @@ pipe_exec()
```
!!! warning

Because `DAGExecution` instances are mutable, they are non thread-safe. This is unlike `DAG` which is ThreadSafe. Create a DAGExecution per thread if you want to run the same `DAG` in parallel.
Because `DAGExecution` instances are mutable, they are not thread-safe. This is unlike `DAG` which is ThreadSafe. Create a DAGExecution per thread if you want to run the same `DAG` in parallel.

Additionally, you can build a subgraph with the paths you want to include by declaring the root nodes where those paths begin, with the `root_nodes` argument:

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "flit_core.buildapi"

[project]
name = "tawazi"
version = "0.4.0.dev0"
version = "0.4.0"
description = "This library helps you execute a set of functions in a Directed Acyclic Graph (DAG) dependency structure in parallel in a production environment."
authors = [{name = "Mindee", email = "[email protected]"}]
maintainers = [
Expand Down
2 changes: 1 addition & 1 deletion tawazi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .config import cfg
from .consts import Resource

__version__ = "0.4.0.dev0"
__version__ = "0.4.0"

__all__ = [
"AsyncDAG",
Expand Down
4 changes: 2 additions & 2 deletions tawazi/_dag/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .constructor import safe_make_dag
from .constructor import threadsafe_make_dag
from .dag import DAG, AsyncDAG, AsyncDAGExecution, DAGExecution

__all__ = ["DAG", "DAGExecution", "AsyncDAG", "AsyncDAGExecution", "safe_make_dag"]
__all__ = ["DAG", "DAGExecution", "AsyncDAG", "AsyncDAGExecution", "threadsafe_make_dag"]
15 changes: 9 additions & 6 deletions tawazi/_dag/constructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ def get_args_and_default_args(func: Callable[..., Any]) -> Tuple[List[str], Dict
return args, default_args


def __make_dag(
def make_dag(
_func: Callable[P, RVDAG], max_concurrency: int, is_async: bool
) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]:
"""Make a DAG or AsyncDAG from the function that describes the DAG."""
# 2. make ExecNodes corresponding to the arguments of the ExecNode
# 2.1 get the names of the arguments and the default values
func_args, func_default_args = get_args_and_default_args(_func)
Expand Down Expand Up @@ -87,19 +88,21 @@ def __make_dag(
)


def _make_dag(
def wrap_make_dag(
_func: Callable[P, RVDAG], max_concurrency: int, is_async: bool
) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]:
"""Clean up before and after making the DAG."""
# 1. node.exec_nodes contains all the ExecNodes that concern the DAG being built at the moment.
# make sure it is empty
node.exec_nodes = StrictDict()
node.results = StrictDict()
node.DAG_PREFIX = []

try:
return __make_dag(_func, max_concurrency, is_async)
return make_dag(_func, max_concurrency, is_async)
except NameError as e:
warnings.warn("Are you trying to do recursion?", stacklevel=3)
if _func.__name__ in e.args[0]:
warnings.warn("Recursion is not supported for DAGs", stacklevel=3)
raise e
# clean up even in case an error is raised during dag construction
finally:
Expand All @@ -110,12 +113,12 @@ def _make_dag(
node.DAG_PREFIX = []


def safe_make_dag(
def threadsafe_make_dag(
_func: Union[Callable[P, RVDAG]], max_concurrency: int, is_async: bool
) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]:
"""Make DAG or AsyncDAG form the function that describes the DAG.
Thread safe and cleans after itself.
"""
with node.exec_nodes_lock:
return _make_dag(_func, max_concurrency, is_async)
return wrap_make_dag(_func, max_concurrency, is_async)
61 changes: 27 additions & 34 deletions tawazi/_dag/dag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""module containing DAG and DAGExecution which are the containers that run ExecNodes in Tawazi."""
import dataclasses
import json
import pickle
import warnings
Expand Down Expand Up @@ -59,6 +58,14 @@ def construct_subdag_arg_uxns(
return uxns


def detect_duplicates(expanded_config: List[Tuple[Identifier, Any]]) -> None:
duplicates = [
id for id, count in Counter([id for id, _ in expanded_config]).items() if count > 1
]
if duplicates:
raise ValueError(f"trying to set two configs for nodes {duplicates}.")


@dataclass
class BaseDAG(Generic[P, RVDAG]):
"""Data Structure containing ExecNodes with interdependencies.
Expand Down Expand Up @@ -334,7 +341,7 @@ def _add_missing_deps(candidate_id: Identifier, xn_ids: Set[Identifier]) -> None
)

# change input ExecNodes to ArgExecNodes
new_in_ids = [make_axn_id("composed", old_id) for old_id in in_ids]
new_in_ids = [make_axn_id(qualname, old_id) for old_id in in_ids]
for old_id, new_id in zip(in_ids, new_in_ids):
logger.debug("changing Composed-DAG's input {} into ArgExecNode", new_id)
xn_dict[new_id] = ArgExecNode(new_id)
Expand Down Expand Up @@ -457,6 +464,15 @@ def _pre_setup(
)
return graph

def _expand_config(
self, config_nodes: Dict[Union[Tag, Identifier], Any]
) -> List[Tuple[Identifier, Any]]:
expanded_config = []
for alias, conf_node in config_nodes.items():
ids = self.alias_to_ids(alias)
expanded_config.extend([(id, conf) for id, conf in zip(ids, len(ids) * [conf_node])])
return expanded_config

def config_from_dict(self, config: Dict[str, Any]) -> None:
"""Allows reconfiguring the parameters of the nodes from a dictionary.
Expand All @@ -467,39 +483,12 @@ def config_from_dict(self, config: Dict[str, Any]) -> None:
Raises:
ValueError: if two nodes are configured by the provided config (which is ambiguous)
"""

def node_values(n: ExecNode, conf: Dict[str, Any]) -> Dict[str, Any]:
values = dataclasses.asdict(n)
values["args"] = n.args
values["kwargs"] = n.kwargs
values["is_sequential"] = conf.get("is_sequential", n.is_sequential)
values["priority"] = conf.get("priority", n.priority)
return values # ignore: typing[no-any-return]

def expand_config(
config_nodes: Dict[Union[Tag, Identifier], Any]
) -> List[Tuple[Identifier, Any]]:
expanded_config = []
for alias, conf_node in config_nodes.items():
ids = self.alias_to_ids(alias)
expanded_config.extend(
[(id, conf) for id, conf in zip(ids, len(ids) * [conf_node])]
)
return expanded_config

def detect_duplicates(expanded_config: List[Tuple[Identifier, Any]]) -> None:
duplicates = [
id for id, count in Counter([id for id, _ in expanded_config]).items() if count > 1
]
if duplicates:
raise ValueError(f"trying to set two configs for nodes {duplicates}.")

if "nodes" in config:
expanded_config = expand_config(config["nodes"])
expanded_config = self._expand_config(config["nodes"])
detect_duplicates(expanded_config)
for node_id, conf_node in expanded_config:
node = self.get_node_by_id(node_id)
values = node_values(node, conf_node)
values = node._conf_to_values(conf_node)
self.exec_nodes.force_set(node_id, node.__class__(**values))

if "max_concurrency" in config:
Expand Down Expand Up @@ -720,7 +709,7 @@ def to_subdag_id(id_: str) -> str:
# input ExecNode was already registered during step for zip
if new_id in registered_input_ids:
logger.debug(
"Skipping ExecNode {} because it is an already registered input", new_id
"Skipping ExecNode {} because the input is already registered", new_id
)
continue

Expand Down Expand Up @@ -880,8 +869,12 @@ async def run_subgraph(
for node_id, result in results.items():
xn = self.exec_nodes[node_id]
if xn.setup and not xn.executed(self.results):
logger.debug("Setting result of setup ExecNode {} to {}", node_id, result)
logger.debug("Future executions will use this result.")
logger.debug(
"Setting result of setup ExecNode {} to {}"
"Future executions will use this result.",
node_id,
result,
)
self.results[node_id] = result

return exec_nodes, results, profiles
Expand Down
2 changes: 1 addition & 1 deletion tawazi/_dag/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def to_thread_in_executor(
allowing context variables from the main thread to be accessed in the
separate thread.
Return a coroutine that can be awaited to get the eventual result of *func*.
Return a coroutine that can be awaited to get the result of *func*.
"""
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
Expand Down
4 changes: 2 additions & 2 deletions tawazi/_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing_extensions import Literal

from tawazi import AsyncDAG
from tawazi._dag import DAG, safe_make_dag
from tawazi._dag import DAG, threadsafe_make_dag

from .config import cfg
from .consts import RVDAG, RVXN, P, Resource, TagOrTags
Expand Down Expand Up @@ -213,7 +213,7 @@ def dag(
# wrapper used to support parametrized and non parametrized decorators
def intermediate_wrapper(_func: Callable[P, RVDAG]) -> Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]:
# 0. Protect against multiple threads declaring many DAGs at the same time
d = safe_make_dag(_func, max_concurrency, is_async)
d = threadsafe_make_dag(_func, max_concurrency, is_async)
functools.update_wrapper(d, _func)
return d

Expand Down
8 changes: 8 additions & 0 deletions tawazi/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,14 @@ def get_call_location(self) -> str:
frame_info = inspect.getframeinfo(frame)
return f"{frame_info.filename}:{frame_info.lineno}"

def _conf_to_values(self, conf: Dict[str, Any]) -> Dict[str, Any]:
values = dataclasses.asdict(self)
values["args"] = self.args
values["kwargs"] = self.kwargs
values["is_sequential"] = conf.get("is_sequential", self.is_sequential)
values["priority"] = conf.get("priority", self.priority)
return values # ignore: typing[no-any-return]


class ReturnExecNode(ExecNode):
"""ExecNode corresponding to a constant Return value of a DAG."""
Expand Down
2 changes: 1 addition & 1 deletion tests/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def test_setting_execnode_id_should_fail() -> None:


def test_execnodes() -> None:
with pytest.raises(NameError), pytest.warns(UserWarning, match="recursion"):
with pytest.raises(NameError):

@dag
def pipe() -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_dag_in_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def my_dag(v: int, activate: bool, activate_subdag: bool) -> int:

def test_recursive() -> None:
with pytest.raises(NameError):
with pytest.warns(UserWarning, match="Are you trying to do recursion?"):
with pytest.warns(UserWarning, match="Recursion is not supported for DAGs"):

@dag
def rec() -> None:
Expand Down

0 comments on commit d317f95

Please sign in to comment.