From 3767adeb52ea4f32c7771a9cded6f129caa7f47b Mon Sep 17 00:00:00 2001 From: Venkat Date: Tue, 17 Sep 2024 18:24:40 -0400 Subject: [PATCH 01/17] changes --- README.md | 2 +- docs/getting_started/code_samples.rst | 3 + .../ray_taskflow_example_dynamic_config.py | 59 +++++++++++++++++++ ray_provider/decorators/ray.py | 11 +++- 4 files changed, 73 insertions(+), 2 deletions(-) create mode 100644 example_dags/ray_taskflow_example_dynamic_config.py diff --git a/README.md b/README.md index 7952746..8ff58a1 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Check out the Getting Started guide in our [docs](https://astronomer.github.io/a ## Sample DAGs ### Example 1: Using @ray.task for job life cycle -The below example showcases how to use the ``@ray.task`` decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown. +The below example showcases how to use the ``@ray.task`` decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown. The configuration for the decorator can provided statically or at runtime. This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion diff --git a/docs/getting_started/code_samples.rst b/docs/getting_started/code_samples.rst index 622ae2f..caa6d82 100644 --- a/docs/getting_started/code_samples.rst +++ b/docs/getting_started/code_samples.rst @@ -41,6 +41,9 @@ The below example showcases how to use the ``@ray.task`` decorator to manage the This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion. +.. note:: + Configuration can be specified as a dictionary, either statically or dynamically at runtime as needed. + .. literalinclude:: ../../example_dags/ray_taskflow_example.py :language: python :linenos: diff --git a/example_dags/ray_taskflow_example_dynamic_config.py b/example_dags/ray_taskflow_example_dynamic_config.py new file mode 100644 index 0000000..5d9abd7 --- /dev/null +++ b/example_dags/ray_taskflow_example_dynamic_config.py @@ -0,0 +1,59 @@ +from datetime import datetime +from pathlib import Path + +from airflow.decorators import dag, task + +from ray_provider.decorators.ray import ray + + +def generate_config(): + CONN_ID = "ray_conn" + RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml" + FOLDER_PATH = Path(__file__).parent / "ray_scripts" + return { + "conn_id": CONN_ID, + "runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]}, + "num_cpus": 1, + "num_gpus": 0, + "memory": 0, + "poll_interval": 5, + "ray_cluster_yaml": str(RAY_SPEC), + "xcom_task_key": "dashboard", + } + + +@dag( + dag_id="Ray_Taskflow_Example_Dynamic_Config", + start_date=datetime(2023, 1, 1), + schedule=None, + catchup=False, + tags=["ray", "example"], +) +def ray_taskflow_dag(): + + @task + def generate_data(): + return [1, 2, 3] + + @ray.task(config=generate_config) + def process_data_with_ray(data): + import numpy as np + import ray + + @ray.remote + def square(x): + return x**2 + + ray.init() + data = np.array(data) + futures = [square.remote(x) for x in data] + results = ray.get(futures) + mean = np.mean(results) + print(f"Mean of this population is {mean}") + return mean + + data = generate_data() + process_data_with_ray(data) + + +ray_example_dag = ray_taskflow_dag() diff --git a/ray_provider/decorators/ray.py b/ray_provider/decorators/ray.py index 87322fb..ee51872 100644 --- a/ray_provider/decorators/ray.py +++ b/ray_provider/decorators/ray.py @@ -146,19 +146,28 @@ class ray: def task( python_callable: Callable[..., Any] | None = None, multiple_outputs: bool | None = None, + config: Callable[[], dict[str, Any]] | dict[str, Any] | None = None, **kwargs: Any, ) -> TaskDecorator: """ Decorator to define a task that submits a Ray job. - :param python_callable: The callable function to decorate. :param multiple_outputs: If True, will return multiple outputs. + :param config: A dictionary of configuration or a callable that returns a dictionary. :param kwargs: Additional keyword arguments. :return: The decorated task. """ + if config is None: + config = {} + elif callable(config): + config = config() + elif not isinstance(config, dict): + raise TypeError("config must be either a callable, a dictionary, or None") + return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=_RayDecoratedOperator, + config=config, **kwargs, ) From fef3233d0dcc8357c5ac1325b03f95d5ab12acdd Mon Sep 17 00:00:00 2001 From: Venkat Date: Thu, 19 Sep 2024 02:11:53 -0400 Subject: [PATCH 02/17] update --- ray_provider/decorators/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/decorators/ray.py b/ray_provider/decorators/ray.py index ee51872..f63aac2 100644 --- a/ray_provider/decorators/ray.py +++ b/ray_provider/decorators/ray.py @@ -160,7 +160,7 @@ def task( if config is None: config = {} elif callable(config): - config = config() + config = config(**kwargs) elif not isinstance(config, dict): raise TypeError("config must be either a callable, a dictionary, or None") From 9235c3d505ca9f00497ef89f76b571dcf64531f2 Mon Sep 17 00:00:00 2001 From: Venkat Date: Thu, 19 Sep 2024 18:54:58 -0400 Subject: [PATCH 03/17] minor fix --- ray_provider/decorators/ray.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ray_provider/decorators/ray.py b/ray_provider/decorators/ray.py index f63aac2..38d84e0 100644 --- a/ray_provider/decorators/ray.py +++ b/ray_provider/decorators/ray.py @@ -88,7 +88,7 @@ def execute(self, context: Context) -> Any: :return: The result of the Ray job execution. :raises AirflowException: If job submission fails. """ - tmp_dir = None + temp_dir = None try: if self.is_decorated_function: self.log.info( @@ -126,8 +126,8 @@ def execute(self, context: Context) -> Any: self.log.error(f"Failed during execution with error: {e}") raise AirflowException("Job submission failed") from e finally: - if tmp_dir and os.path.exists(tmp_dir): - shutil.rmtree(tmp_dir) + if temp_dir and os.path.exists(temp_dir): + shutil.rmtree(temp_dir) def _extract_function_body(self, source: str) -> str: """Extract the function, excluding only the ray.task decorator.""" @@ -169,5 +169,4 @@ def task( multiple_outputs=multiple_outputs, decorated_operator_class=_RayDecoratedOperator, config=config, - **kwargs, ) From 8bebbbaed895cd4194ce133a5db8d5acb0deb96f Mon Sep 17 00:00:00 2001 From: Venkat Date: Thu, 19 Sep 2024 18:57:01 -0400 Subject: [PATCH 04/17] example dag updated --- example_dags/ray_taskflow_example_dynamic_config.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/example_dags/ray_taskflow_example_dynamic_config.py b/example_dags/ray_taskflow_example_dynamic_config.py index 5d9abd7..e6bc087 100644 --- a/example_dags/ray_taskflow_example_dynamic_config.py +++ b/example_dags/ray_taskflow_example_dynamic_config.py @@ -6,19 +6,22 @@ from ray_provider.decorators.ray import ray -def generate_config(): +def generate_config(custom_memory: int, **context): + CONN_ID = "ray_conn" RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml" FOLDER_PATH = Path(__file__).parent / "ray_scripts" + return { "conn_id": CONN_ID, "runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]}, "num_cpus": 1, "num_gpus": 0, - "memory": 0, + "memory": custom_memory, "poll_interval": 5, "ray_cluster_yaml": str(RAY_SPEC), "xcom_task_key": "dashboard", + "execution_date": str(context.get("execution_date")), } @@ -30,12 +33,11 @@ def generate_config(): tags=["ray", "example"], ) def ray_taskflow_dag(): - @task def generate_data(): return [1, 2, 3] - @ray.task(config=generate_config) + @ray.task(config=generate_config, custom_memory=1024) def process_data_with_ray(data): import numpy as np import ray @@ -44,7 +46,6 @@ def process_data_with_ray(data): def square(x): return x**2 - ray.init() data = np.array(data) futures = [square.remote(x) for x in data] results = ray.get(futures) From bb5b117320d90cb27ee854b9091ae6904760f209 Mon Sep 17 00:00:00 2001 From: Venkat Date: Fri, 20 Sep 2024 17:12:05 -0400 Subject: [PATCH 05/17] update --- docs/getting_started/code_samples.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/getting_started/code_samples.rst b/docs/getting_started/code_samples.rst index caa6d82..1ad0ffa 100644 --- a/docs/getting_started/code_samples.rst +++ b/docs/getting_started/code_samples.rst @@ -42,7 +42,7 @@ The below example showcases how to use the ``@ray.task`` decorator to manage the This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion. .. note:: - Configuration can be specified as a dictionary, either statically or dynamically at runtime as needed. + Configuration can be specified as a dictionary, either statically or dynamically at runtime as needed. We can also provide additional inputs while generating dynamic configurations. See example dags for reference. .. literalinclude:: ../../example_dags/ray_taskflow_example.py :language: python From f60c18579df5991beaf8cf3cf246cc3d898d9311 Mon Sep 17 00:00:00 2001 From: Venkat Date: Wed, 25 Sep 2024 13:44:41 -0400 Subject: [PATCH 06/17] Updated code --- ray_provider/decorators/ray.py | 100 ++++++++++++++++----------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/ray_provider/decorators/ray.py b/ray_provider/decorators/ray.py index 38d84e0..7acdff5 100644 --- a/ray_provider/decorators/ray.py +++ b/ray_provider/decorators/ray.py @@ -31,54 +31,23 @@ class _RayDecoratedOperator(DecoratedOperator, SubmitRayJob): template_fields: Any = (*SubmitRayJob.template_fields, "op_args", "op_kwargs") - def __init__(self, config: dict[str, Any], **kwargs: Any) -> None: - self.conn_id: str = config.get("conn_id", "") - self.is_decorated_function = False if "entrypoint" in config else True - self.entrypoint: str = config.get("entrypoint", "python script.py") - self.runtime_env: dict[str, Any] = config.get("runtime_env", {}) - - self.num_cpus: int | float = config.get("num_cpus", 1) - self.num_gpus: int | float = config.get("num_gpus", 0) - self.memory: int | float = config.get("memory", None) - self.ray_resources: dict[str, Any] | None = config.get("resources", None) - self.ray_cluster_yaml: str | None = config.get("ray_cluster_yaml", None) - self.update_if_exists: bool = config.get("update_if_exists", False) - self.kuberay_version: str = config.get("kuberay_version", "1.0.0") - self.gpu_device_plugin_yaml: str = config.get( - "gpu_device_plugin_yaml", - "https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.9.0/nvidia-device-plugin.yml", - ) - self.fetch_logs: bool = config.get("fetch_logs", True) - self.wait_for_completion: bool = config.get("wait_for_completion", True) - self.job_timeout_seconds: int = config.get("job_timeout_seconds", 600) - self.poll_interval: int = config.get("poll_interval", 60) - self.xcom_task_key: str | None = config.get("xcom_task_key", None) + def __init__(self, config: dict[str, Any] | Callable[[Context], dict[str, Any]], **kwargs: Any) -> None: + self.config = config + self.kwargs = kwargs - if not isinstance(self.num_cpus, (int, float)): - raise TypeError("num_cpus should be an integer or float value") - if not isinstance(self.num_gpus, (int, float)): - raise TypeError("num_gpus should be an integer or float value") - - super().__init__( - conn_id=self.conn_id, - entrypoint=self.entrypoint, - runtime_env=self.runtime_env, - num_cpus=self.num_cpus, - num_gpus=self.num_gpus, - memory=self.memory, - resources=self.ray_resources, - ray_cluster_yaml=self.ray_cluster_yaml, - update_if_exists=self.update_if_exists, - kuberay_version=self.kuberay_version, - gpu_device_plugin_yaml=self.gpu_device_plugin_yaml, - fetch_logs=self.fetch_logs, - wait_for_completion=self.wait_for_completion, - job_timeout_seconds=self.job_timeout_seconds, - poll_interval=self.poll_interval, - xcom_task_key=self.xcom_task_key, - **kwargs, - ) + super().__init__(conn_id="", entrypoint="python script.py", runtime_env={}, **kwargs) + + def get_config(self, context: Context, config: Callable[..., dict[str, Any]], **kwargs: Any) -> dict[str, Any]: + config_params = inspect.signature(config).parameters + + config_kwargs = {k: v for k, v in kwargs.items() if k in config_params and k != "context"} + + if "context" in config_params: + config_kwargs["context"] = context + + # Call config with the prepared arguments + return config(**config_kwargs) def execute(self, context: Context) -> Any: """ @@ -90,6 +59,40 @@ def execute(self, context: Context) -> Any: """ temp_dir = None try: + # Generate the configuration + if callable(self.config): + config = self.get_config(context=context, config=self.config, **self.kwargs) + else: + config = self.config + + # Prepare Ray job parameters + self.conn_id: str = config.get("conn_id", "") + self.is_decorated_function = False if "entrypoint" in config else True + self.entrypoint: str = config.get("entrypoint", "python script.py") + self.runtime_env: dict[str, Any] = config.get("runtime_env", {}) + + self.num_cpus: int | float = config.get("num_cpus", 1) + self.num_gpus: int | float = config.get("num_gpus", 0) + self.memory: int | float = config.get("memory", None) + self.ray_resources: dict[str, Any] | None = config.get("resources", None) + self.ray_cluster_yaml: str | None = config.get("ray_cluster_yaml", None) + self.update_if_exists: bool = config.get("update_if_exists", False) + self.kuberay_version: str = config.get("kuberay_version", "1.0.0") + self.gpu_device_plugin_yaml: str = config.get( + "gpu_device_plugin_yaml", + "https://raw.githubusercontent.com/NVIDIA/k8s-device-plugin/v0.9.0/nvidia-device-plugin.yml", + ) + self.fetch_logs: bool = config.get("fetch_logs", True) + self.wait_for_completion: bool = config.get("wait_for_completion", True) + self.job_timeout_seconds: int = config.get("job_timeout_seconds", 600) + self.poll_interval: int = config.get("poll_interval", 60) + self.xcom_task_key: str | None = config.get("xcom_task_key", None) + + if not isinstance(self.num_cpus, (int, float)): + raise TypeError("num_cpus should be an integer or float value") + if not isinstance(self.num_gpus, (int, float)): + raise TypeError("num_gpus should be an integer or float value") + if self.is_decorated_function: self.log.info( f"Entrypoint is not provided, is_decorated_function is set to {self.is_decorated_function}" @@ -159,14 +162,11 @@ def task( """ if config is None: config = {} - elif callable(config): - config = config(**kwargs) - elif not isinstance(config, dict): - raise TypeError("config must be either a callable, a dictionary, or None") return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=_RayDecoratedOperator, config=config, + **kwargs, ) From a8629e6aafdbbc60223ead61af638d7da82656b0 Mon Sep 17 00:00:00 2001 From: Venkat Date: Wed, 25 Sep 2024 13:46:15 -0400 Subject: [PATCH 07/17] minor change --- ray_provider/decorators/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/decorators/ray.py b/ray_provider/decorators/ray.py index 7acdff5..ae52f57 100644 --- a/ray_provider/decorators/ray.py +++ b/ray_provider/decorators/ray.py @@ -31,7 +31,7 @@ class _RayDecoratedOperator(DecoratedOperator, SubmitRayJob): template_fields: Any = (*SubmitRayJob.template_fields, "op_args", "op_kwargs") - def __init__(self, config: dict[str, Any] | Callable[[Context], dict[str, Any]], **kwargs: Any) -> None: + def __init__(self, config: dict[str, Any] | Callable[..., dict[str, Any]], **kwargs: Any) -> None: self.config = config self.kwargs = kwargs From df95ef11f3ff8add0ae6637932ce9db968f1de97 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 10:46:09 +0300 Subject: [PATCH 08/17] Release 0.3.0a1 --- ray_provider/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/__init__.py b/ray_provider/__init__.py index d9657e7..5c25b9d 100644 --- a/ray_provider/__init__.py +++ b/ray_provider/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -__version__ = "0.2.1" +__version__ = "0.3.0a1" from typing import Any From 4f91d26c4b5c9d2931a1227ac6f6e85a3d3268c8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 13:18:54 +0300 Subject: [PATCH 09/17] Fix issue in the decorator When running the DAG: """ This tutorial demonstrates how to use the Ray provider in Airflow to parallelize a task using Ray. """ from airflow.decorators import dag, task from ray_provider.decorators.ray import ray CONN_ID = "ray_conn_2" RAY_TASK_CONFIG = { "conn_id": CONN_ID, "num_cpus": 1, "num_gpus": 0, "memory": 0, "poll_interval": 5, } @dag( start_date=None, schedule=None, catchup=False, tags=["ray", "example", "TEST"], doc_md=__doc__, ) def test_taskflow_ray_tutorial(): @task def generate_data() -> list: """ Generate sample data Returns: list: List of integers """ import random return [random.randint(1, 100) for _ in range(10)] # use the @ray.task decorator to parallelize the task @ray.task(config=RAY_TASK_CONFIG) def get_mean_squared_value(data: list) -> float: """ Get the mean squared value from a list of integers Args: data (list): List of integers Returns: float: Mean value of the list """ import numpy as np import ray @ray.remote def square(x: int) -> int: """ Square a number Args: x (int): Number to square Returns: int: Squared number """ return x**2 ray.init() data = np.array(data) futures = [square.remote(x) for x in data] results = ray.get(futures) mean = np.mean(results) print(f"Mean squared value: {mean}") data = generate_data() get_mean_squared_value(data) test_taskflow_ray_tutorial() We faced the issue: Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 286, in execute self.defer( File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1777, in defer raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/airflow/exceptions.py", line 431, in __init__ raise ValueError("Timeout value must be a timedelta") ValueError: Timeout value must be a timedelta --- ray_provider/operators/ray.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ray_provider/operators/ray.py b/ray_provider/operators/ray.py index 02b6a73..6f500ed 100644 --- a/ray_provider/operators/ray.py +++ b/ray_provider/operators/ray.py @@ -281,6 +281,11 @@ def execute(self, context: Context) -> str: current_status = self.hook.get_ray_job_status(self.dashboard_url, self.job_id) self.log.info(f"Current job status for {self.job_id} is: {current_status}") + if isinstance(self.job_timeout_seconds, timedelta): + job_timeout_seconds = self.job_timeout_seconds + else: + job_timeout_seconds = timedelta(seconds=self.job_timeout_seconds) if self.job_timeout_seconds > 0 else None + if current_status not in self.terminal_states: self.log.info("Deferring the polling to RayJobTrigger...") self.defer( @@ -294,7 +299,7 @@ def execute(self, context: Context) -> str: fetch_logs=self.fetch_logs, ), method_name="execute_complete", - timeout=self.job_timeout_seconds, + timeout=job_timeout_seconds, ) elif current_status == JobStatus.SUCCEEDED: self.log.info("Job %s completed successfully", self.job_id) From 0491d7e09374cbd49942422fb1bca2084df7f067 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 13:20:01 +0300 Subject: [PATCH 10/17] Release 0.3.0a2 --- ray_provider/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/__init__.py b/ray_provider/__init__.py index 5c25b9d..84c5c92 100644 --- a/ray_provider/__init__.py +++ b/ray_provider/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -__version__ = "0.3.0a1" +__version__ = "0.3.0a2" from typing import Any From cb98a6c630195496dfa6d4e64ddcfb7bf10edc7d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 13:42:05 +0300 Subject: [PATCH 11/17] Fix another issue after joining the two changes It works when not setting job_timeout_seconds or setting it to a positive integer but not with 0: get_mean_squared_value = SubmitRayJob( task_id="SubmitRayJob", conn_id=CONN_ID, entrypoint="python ray_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}", runtime_env=RAY_RUNTIME_ENV, num_cpus=1, num_gpus=0, memory=0, resources={}, xcom_task_key="SubmitRayJob.dashboard", fetch_logs=True, wait_for_completion=True, job_timeout_seconds=0, poll_interval=5, ) failed with [2024-09-27, 10:29:53 UTC] {taskinstance.py:3310} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 287, in execute job_timeout_seconds = timedelta(seconds=self.job_timeout_seconds) if self.job_timeout_seconds > 0 else None ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: '>' not supported between instances of 'NoneType' and 'int' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 317, in execute raise AirflowException(f"SubmitRayJob operator failed due to {e}. Cleaning up resources...") airflow.exceptions.AirflowException: SubmitRayJob operator failed due to '>' not supported between instances of 'NoneType' and 'int'. Cleaning up resources... --- ray_provider/operators/ray.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/operators/ray.py b/ray_provider/operators/ray.py index 6f500ed..ea33b89 100644 --- a/ray_provider/operators/ray.py +++ b/ray_provider/operators/ray.py @@ -283,7 +283,7 @@ def execute(self, context: Context) -> str: if isinstance(self.job_timeout_seconds, timedelta): job_timeout_seconds = self.job_timeout_seconds - else: + elif self.job_timeout_seconds is not None: job_timeout_seconds = timedelta(seconds=self.job_timeout_seconds) if self.job_timeout_seconds > 0 else None if current_status not in self.terminal_states: From 866756ddf10815f530046f88aa0205ab40dc4ab0 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 13:42:57 +0300 Subject: [PATCH 12/17] Release 0.3.0a3 --- ray_provider/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/__init__.py b/ray_provider/__init__.py index 84c5c92..d942ca8 100644 --- a/ray_provider/__init__.py +++ b/ray_provider/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -__version__ = "0.3.0a2" +__version__ = "0.3.0a3" from typing import Any From 682b5d43c3e79fcfdac90b9d813dc428565be6b4 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 14:02:59 +0300 Subject: [PATCH 13/17] Fix issue when running get_mean_squared_value = SubmitRayJob( task_id="SubmitRayJob", conn_id=CONN_ID, entrypoint="python ray_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}", runtime_env=RAY_RUNTIME_ENV, num_cpus=1, num_gpus=0, memory=0, resources={}, xcom_task_key="SubmitRayJob.dashboard", fetch_logs=True, wait_for_completion=True, job_timeout_seconds=0, poll_interval=5, ) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit That resulted [2024-09-27, 10:55:06 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs [2024-09-27, 10:55:06 UTC] {ray.py:219} INFO - Dashboard URL retrieved from XCom: None [2024-09-27, 10:55:06 UTC] {base.py:84} INFO - Retrieving connection 'ray_conn_2' [2024-09-27, 10:55:06 UTC] {ray.py:87} INFO - Ray cluster address is: http://172.23.0.3:30487 [2024-09-27, 10:55:06 UTC] {ray.py:155} INFO - Address URL is: http://172.23.0.3:30487 [2024-09-27, 10:55:06 UTC] {ray.py:156} INFO - Dashboard URL is: None [2024-09-27, 10:55:06 UTC] {ray.py:183} INFO - Submitted job with ID: raysubmit_G22MqPHyvLv8ghRV [2024-09-27, 10:55:06 UTC] {ray.py:278} INFO - Ray job submitted with id: raysubmit_G22MqPHyvLv8ghRV [2024-09-27, 10:55:06 UTC] {ray.py:208} INFO - Job raysubmit_G22MqPHyvLv8ghRV status: PENDING [2024-09-27, 10:55:06 UTC] {ray.py:282} INFO - Current job status for raysubmit_G22MqPHyvLv8ghRV is: PENDING [2024-09-27, 10:55:06 UTC] {ray.py:290} INFO - Deferring the polling to RayJobTrigger... [2024-09-27, 10:55:06 UTC] {taskinstance.py:3310} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 302, in execute timeout=job_timeout_seconds, ^^^^^^^^^^^^^^^^^^^ UnboundLocalError: cannot access local variable 'job_timeout_seconds' where it is not associated with a value During handling of the above exception, another exception occurred: --- ray_provider/operators/ray.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ray_provider/operators/ray.py b/ray_provider/operators/ray.py index ea33b89..1ef859a 100644 --- a/ray_provider/operators/ray.py +++ b/ray_provider/operators/ray.py @@ -281,6 +281,7 @@ def execute(self, context: Context) -> str: current_status = self.hook.get_ray_job_status(self.dashboard_url, self.job_id) self.log.info(f"Current job status for {self.job_id} is: {current_status}") + job_timeout_seconds = timedelta(seconds=0) if isinstance(self.job_timeout_seconds, timedelta): job_timeout_seconds = self.job_timeout_seconds elif self.job_timeout_seconds is not None: From 2a37b134a028c0d498c1eb8cf58413a98f8655ac Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 14:04:24 +0300 Subject: [PATCH 14/17] Release 0.3.0a4 --- ray_provider/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/__init__.py b/ray_provider/__init__.py index d942ca8..1402edc 100644 --- a/ray_provider/__init__.py +++ b/ray_provider/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -__version__ = "0.3.0a3" +__version__ = "0.3.0a4" from typing import Any From 8fab03dd99615bcb7b6dbec2eb79fd3013dba2ec Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Fri, 27 Sep 2024 15:21:51 +0300 Subject: [PATCH 15/17] Release 0.3.0a5 --- ray_provider/__init__.py | 2 +- ray_provider/operators/ray.py | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/ray_provider/__init__.py b/ray_provider/__init__.py index 1402edc..abdac9b 100644 --- a/ray_provider/__init__.py +++ b/ray_provider/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -__version__ = "0.3.0a4" +__version__ = "0.3.0a5" from typing import Any diff --git a/ray_provider/operators/ray.py b/ray_provider/operators/ray.py index 1ef859a..5d99d28 100644 --- a/ray_provider/operators/ray.py +++ b/ray_provider/operators/ray.py @@ -281,10 +281,8 @@ def execute(self, context: Context) -> str: current_status = self.hook.get_ray_job_status(self.dashboard_url, self.job_id) self.log.info(f"Current job status for {self.job_id} is: {current_status}") - job_timeout_seconds = timedelta(seconds=0) - if isinstance(self.job_timeout_seconds, timedelta): - job_timeout_seconds = self.job_timeout_seconds - elif self.job_timeout_seconds is not None: + job_timeout_seconds = self.job_timeout_seconds + if isinstance(self.job_timeout_seconds, int): job_timeout_seconds = timedelta(seconds=self.job_timeout_seconds) if self.job_timeout_seconds > 0 else None if current_status not in self.terminal_states: From fc47b75a59989c16ba791309386ebb0563c5b98d Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 9 Oct 2024 11:41:47 +0100 Subject: [PATCH 16/17] Give visibility of error while trying to run operator in Astro --- ray_provider/operators/ray.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ray_provider/operators/ray.py b/ray_provider/operators/ray.py index 5d99d28..2a9820e 100644 --- a/ray_provider/operators/ray.py +++ b/ray_provider/operators/ray.py @@ -1,5 +1,6 @@ from __future__ import annotations +import traceback from datetime import timedelta from functools import cached_property from typing import Any @@ -312,8 +313,10 @@ def execute(self, context: Context) -> str: ) return self.job_id except Exception as e: - self._delete_cluster() + error_details = traceback.format_exc() + self.log.info(error_details) raise AirflowException(f"SubmitRayJob operator failed due to {e}. Cleaning up resources...") + self._delete_cluster() def execute_complete(self, context: Context, event: dict[str, Any]) -> None: """ From 8cc17b3562fbf0bf5eb81129de04e83511105305 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Wed, 9 Oct 2024 11:42:18 +0100 Subject: [PATCH 17/17] Release 0.3.0a6 --- ray_provider/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ray_provider/__init__.py b/ray_provider/__init__.py index abdac9b..e2e3223 100644 --- a/ray_provider/__init__.py +++ b/ray_provider/__init__.py @@ -1,6 +1,6 @@ from __future__ import annotations -__version__ = "0.3.0a5" +__version__ = "0.3.0a6" from typing import Any