Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decorator configuration improvements #67

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Check out the Getting Started guide in our [docs](https://astronomer.github.io/a
## Sample DAGs

### Example 1: Using @ray.task for job life cycle
The below example showcases how to use the ``@ray.task`` decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown.
The below example showcases how to use the ``@ray.task`` decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown. The configuration for the decorator can provided statically or at runtime.

This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion

Expand Down
3 changes: 3 additions & 0 deletions docs/getting_started/code_samples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ The below example showcases how to use the ``@ray.task`` decorator to manage the

This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion.

.. note::
Configuration can be specified as a dictionary, either statically or dynamically at runtime as needed.

.. literalinclude:: ../../example_dags/ray_taskflow_example.py
:language: python
:linenos:
Expand Down
59 changes: 59 additions & 0 deletions example_dags/ray_taskflow_example_dynamic_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from datetime import datetime
from pathlib import Path

from airflow.decorators import dag, task

from ray_provider.decorators.ray import ray


def generate_config():
CONN_ID = "ray_conn"
RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
FOLDER_PATH = Path(__file__).parent / "ray_scripts"
return {
"conn_id": CONN_ID,
"runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]},
"num_cpus": 1,
"num_gpus": 0,
"memory": 0,
"poll_interval": 5,
"ray_cluster_yaml": str(RAY_SPEC),
"xcom_task_key": "dashboard",
}
venkatajagannath marked this conversation as resolved.
Show resolved Hide resolved


@dag(
dag_id="Ray_Taskflow_Example_Dynamic_Config",
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False,
tags=["ray", "example"],
)
def ray_taskflow_dag():

@task
def generate_data():
return [1, 2, 3]

@ray.task(config=generate_config)
venkatajagannath marked this conversation as resolved.
Show resolved Hide resolved
def process_data_with_ray(data):
import numpy as np
import ray

@ray.remote
def square(x):
return x**2

ray.init()
data = np.array(data)
futures = [square.remote(x) for x in data]
results = ray.get(futures)
mean = np.mean(results)
print(f"Mean of this population is {mean}")
return mean

data = generate_data()
process_data_with_ray(data)


ray_example_dag = ray_taskflow_dag()
11 changes: 10 additions & 1 deletion ray_provider/decorators/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,28 @@ class ray:
def task(
python_callable: Callable[..., Any] | None = None,
multiple_outputs: bool | None = None,
config: Callable[[], dict[str, Any]] | dict[str, Any] | None = None,
**kwargs: Any,
) -> TaskDecorator:
"""
Decorator to define a task that submits a Ray job.

:param python_callable: The callable function to decorate.
:param multiple_outputs: If True, will return multiple outputs.
:param config: A dictionary of configuration or a callable that returns a dictionary.
:param kwargs: Additional keyword arguments.
:return: The decorated task.
"""
if config is None:
config = {}
elif callable(config):
config = config()
elif not isinstance(config, dict):
raise TypeError("config must be either a callable, a dictionary, or None")

return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_RayDecoratedOperator,
config=config,
**kwargs,
)
Loading