Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify executor naming #457

Merged
merged 12 commits into from
May 1, 2024
4 changes: 2 additions & 2 deletions cubed/core/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,9 +274,9 @@ def compute(
if executor is None:
executor = arrays[0].spec.executor
if executor is None:
from cubed.runtime.executors.python import PythonDagExecutor
from cubed.runtime.executors.local import SingleThreadedExecutor

executor = PythonDagExecutor()
executor = SingleThreadedExecutor()

_return_in_memory_array = kwargs.pop("_return_in_memory_array", True)
plan.execute(
Expand Down
34 changes: 16 additions & 18 deletions cubed/runtime/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,36 @@ def create_executor(name: str, executor_options: Optional[dict] = None) -> Execu
"""Create an executor from an executor name."""
executor_options = executor_options or {}
if name == "beam":
from cubed.runtime.executors.beam import BeamDagExecutor
from cubed.runtime.executors.beam import BeamExecutor

return BeamDagExecutor(**executor_options)
return BeamExecutor(**executor_options)
elif name == "coiled":
from cubed.runtime.executors.coiled import CoiledFunctionsDagExecutor
from cubed.runtime.executors.coiled import CoiledExecutor

return CoiledFunctionsDagExecutor(**executor_options)
return CoiledExecutor(**executor_options)
elif name == "dask":
from cubed.runtime.executors.dask_distributed_async import (
AsyncDaskDistributedExecutor,
)
from cubed.runtime.executors.dask import DaskExecutor

return AsyncDaskDistributedExecutor(**executor_options)
return DaskExecutor(**executor_options)
elif name == "lithops":
from cubed.runtime.executors.lithops import LithopsDagExecutor
from cubed.runtime.executors.lithops import LithopsExecutor

return LithopsDagExecutor(**executor_options)
return LithopsExecutor(**executor_options)
elif name == "modal":
from cubed.runtime.executors.modal_async import AsyncModalDagExecutor
from cubed.runtime.executors.modal import ModalExecutor

return AsyncModalDagExecutor(**executor_options)
return ModalExecutor(**executor_options)
elif name == "processes":
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.runtime.executors.local import ProcessesExecutor

return AsyncPythonDagExecutor(retries=0, use_processes=True, **executor_options)
return ProcessesExecutor(**executor_options)
elif name == "single-threaded":
from cubed.runtime.executors.python import PythonDagExecutor
from cubed.runtime.executors.local import SingleThreadedExecutor

return PythonDagExecutor(**executor_options)
return SingleThreadedExecutor(**executor_options)
elif name == "threads":
from cubed.runtime.executors.python_async import AsyncPythonDagExecutor
from cubed.runtime.executors.local import ThreadsExecutor

return AsyncPythonDagExecutor(**executor_options)
return ThreadsExecutor(**executor_options)
else:
raise ValueError(f"Unrecognized executor name: {name}")
2 changes: 1 addition & 1 deletion cubed/runtime/executors/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def expand(self, pcoll):
)


class BeamDagExecutor(DagExecutor):
class BeamExecutor(DagExecutor):
"""An execution engine that uses Apache Beam."""

@property
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/coiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def make_coiled_function(func, coiled_kwargs):
return coiled.function(**coiled_kwargs)(execution_stats(func))


class CoiledFunctionsDagExecutor(DagExecutor):
class CoiledExecutor(DagExecutor):
"""An execution engine that uses Coiled Functions."""

def __init__(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ async def async_execute_dag(
handle_callbacks(callbacks, stats)


class AsyncDaskDistributedExecutor(DagExecutor):
class DaskExecutor(DagExecutor):
"""An execution engine that uses Dask Distributed's async API."""

def __init__(self, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion cubed/runtime/executors/lithops.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def standardise_lithops_stats(future: RetryingFuture) -> Dict[str, Any]:
)


class LithopsDagExecutor(DagExecutor):
class LithopsExecutor(DagExecutor):
"""An execution engine that uses Lithops."""

def __init__(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from cubed.runtime.executors.asyncio import async_map_unordered
from cubed.runtime.pipeline import visit_node_generations, visit_nodes
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent
from cubed.runtime.utils import (
execution_stats,
handle_callbacks,
Expand All @@ -22,6 +22,42 @@
from cubed.spec import Spec


def exec_stage_func(input, func=None, config=None, name=None, compute_id=None):
return func(input, config=config)


class SingleThreadedExecutor(DagExecutor):
"""The default execution engine that runs tasks sequentially uses Python loops."""

@property
def name(self) -> str:
return "single-threaded"

def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
for name, node in visit_nodes(dag, resume=resume):
handle_operation_start_callbacks(callbacks, name)
pipeline: CubedPipeline = node["pipeline"]
for m in pipeline.mappable:
exec_stage_func(
m,
pipeline.function,
config=pipeline.config,
name=name,
compute_id=compute_id,
)
if callbacks is not None:
event = TaskEndEvent(name=name)
[callback.on_task_end(event) for callback in callbacks]


@execution_stats
def run_func(input, func=None, config=None, name=None, compute_id=None):
print(f"{compute_id} {name}: running on {input}")
Expand Down Expand Up @@ -169,11 +205,50 @@ async def async_execute_dag(
concurrent_executor.shutdown(wait=False)


class AsyncPythonDagExecutor(DagExecutor):
class ThreadsExecutor(DagExecutor):
"""An execution engine that uses Python asyncio."""

def __init__(self, **kwargs):
self.kwargs = kwargs
self.kwargs = {**kwargs, **dict(use_processes=False)}

# Tell NumPy to use a single thread
# from https://stackoverflow.com/questions/30791550/limit-number-of-threads-in-numpy
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["VECLIB_MAXIMUM_THREADS"] = "1"

@property
def name(self) -> str:
return "threads"

def execute_dag(
self,
dag: MultiDiGraph,
callbacks: Optional[Sequence[Callback]] = None,
resume: Optional[bool] = None,
spec: Optional[Spec] = None,
compute_id: Optional[str] = None,
**kwargs,
) -> None:
merged_kwargs = {**self.kwargs, **kwargs}
asyncio.run(
async_execute_dag(
dag,
callbacks=callbacks,
resume=resume,
spec=spec,
compute_id=compute_id,
**merged_kwargs,
)
)


class ProcessesExecutor(DagExecutor):
"""An execution engine that uses local processes."""

def __init__(self, **kwargs):
self.kwargs = {**kwargs, **dict(retries=0, use_processes=True)}

# Tell NumPy to use a single thread
# from https://stackoverflow.com/questions/30791550/limit-number-of-threads-in-numpy
Expand All @@ -184,7 +259,7 @@ def __init__(self, **kwargs):

@property
def name(self) -> str:
return "processes" if self.kwargs.get("use_processes", False) else "threads"
return "processes"

def execute_dag(
self,
Expand Down
Loading
Loading