Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decorator configuration improvements #67

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

venkatajagannath
Copy link
Contributor

Currently, ray configuration to the ray.task decorator can only be a static input.

This PR fixes that behavior and also allows users to provide the configuration at runtime.

We are also making the following updates --

  • Documentation updates to mention that config can be static or dynamic
  • Added a new example dag that generates a config and passing in the callable

@codecov-commenter
Copy link

codecov-commenter commented Sep 17, 2024

Codecov Report

Attention: Patch coverage is 88.88889% with 1 line in your changes missing coverage. Please review.

Project coverage is 95.47%. Comparing base (a783526) to head (bb5b117).

Files with missing lines Patch % Lines
ray_provider/decorators/ray.py 88.88% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #67      +/-   ##
==========================================
+ Coverage   95.42%   95.47%   +0.04%     
==========================================
  Files           5        5              
  Lines         546      552       +6     
==========================================
+ Hits          521      527       +6     
  Misses         25       25              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@venkatajagannath venkatajagannath marked this pull request as draft September 19, 2024 05:59
When running the DAG:
"""

This tutorial demonstrates how to use the Ray provider in Airflow to parallelize
a task using Ray.
"""

from airflow.decorators import dag, task
from ray_provider.decorators.ray import ray

CONN_ID = "ray_conn_2"
RAY_TASK_CONFIG = {
    "conn_id": CONN_ID,
    "num_cpus": 1,
    "num_gpus": 0,
    "memory": 0,
    "poll_interval": 5,
}

@dag(
    start_date=None,
    schedule=None,
    catchup=False,
    tags=["ray", "example", "TEST"],
    doc_md=__doc__,
)
def test_taskflow_ray_tutorial():

    @task
    def generate_data() -> list:
        """
        Generate sample data
        Returns:
            list: List of integers
        """
        import random

        return [random.randint(1, 100) for _ in range(10)]

    # use the @ray.task decorator to parallelize the task
    @ray.task(config=RAY_TASK_CONFIG)
    def get_mean_squared_value(data: list) -> float:
        """
        Get the mean squared value from a list of integers
        Args:
            data (list): List of integers
        Returns:
            float: Mean value of the list
        """
        import numpy as np
        import ray

        @ray.remote
        def square(x: int) -> int:
            """
            Square a number
            Args:
                x (int): Number to square
            Returns:
                int: Squared number
            """
            return x**2

        ray.init()
        data = np.array(data)
        futures = [square.remote(x) for x in data]
        results = ray.get(futures)
        mean = np.mean(results)
        print(f"Mean squared value: {mean}")

    data = generate_data()
    get_mean_squared_value(data)

test_taskflow_ray_tutorial()

We faced the issue:

Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 286, in execute
    self.defer(
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 1777, in defer
    raise TaskDeferred(trigger=trigger, method_name=method_name, kwargs=kwargs, timeout=timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/exceptions.py", line 431, in __init__
    raise ValueError("Timeout value must be a timedelta")
ValueError: Timeout value must be a timedelta
It works when not setting job_timeout_seconds or setting it to a positive integer but not with 0:
    get_mean_squared_value = SubmitRayJob(
        task_id="SubmitRayJob",
        conn_id=CONN_ID,
        entrypoint="python ray_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}",
        runtime_env=RAY_RUNTIME_ENV,
        num_cpus=1,
        num_gpus=0,
        memory=0,
        resources={},
        xcom_task_key="SubmitRayJob.dashboard",
        fetch_logs=True,
        wait_for_completion=True,
        job_timeout_seconds=0,
        poll_interval=5,
    )
failed with
[2024-09-27, 10:29:53 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 287, in execute
    job_timeout_seconds = timedelta(seconds=self.job_timeout_seconds) if self.job_timeout_seconds > 0 else None
                                                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: '>' not supported between instances of 'NoneType' and 'int'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 317, in execute
    raise AirflowException(f"SubmitRayJob operator failed due to {e}. Cleaning up resources...")
airflow.exceptions.AirflowException: SubmitRayJob operator failed due to '>' not supported between instances of 'NoneType' and 'int'. Cleaning up resources...
    get_mean_squared_value = SubmitRayJob(
        task_id="SubmitRayJob",
        conn_id=CONN_ID,
        entrypoint="python ray_script.py {{ ti.xcom_pull(task_ids='generate_data') | join(' ') }}",
        runtime_env=RAY_RUNTIME_ENV,
        num_cpus=1,
        num_gpus=0,
        memory=0,
        resources={},
        xcom_task_key="SubmitRayJob.dashboard",
        fetch_logs=True,
        wait_for_completion=True,
        job_timeout_seconds=0,
        poll_interval=5,
    )

That resulted

[2024-09-27, 10:55:06 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-09-27, 10:55:06 UTC] {ray.py:219} INFO - Dashboard URL retrieved from XCom: None
[2024-09-27, 10:55:06 UTC] {base.py:84} INFO - Retrieving connection 'ray_conn_2'
[2024-09-27, 10:55:06 UTC] {ray.py:87} INFO - Ray cluster address is: http://172.23.0.3:30487
[2024-09-27, 10:55:06 UTC] {ray.py:155} INFO - Address URL is: http://172.23.0.3:30487
[2024-09-27, 10:55:06 UTC] {ray.py:156} INFO - Dashboard URL is: None
[2024-09-27, 10:55:06 UTC] {ray.py:183} INFO - Submitted job with ID: raysubmit_G22MqPHyvLv8ghRV
[2024-09-27, 10:55:06 UTC] {ray.py:278} INFO - Ray job submitted with id: raysubmit_G22MqPHyvLv8ghRV
[2024-09-27, 10:55:06 UTC] {ray.py:208} INFO - Job raysubmit_G22MqPHyvLv8ghRV status: PENDING
[2024-09-27, 10:55:06 UTC] {ray.py:282} INFO - Current job status for raysubmit_G22MqPHyvLv8ghRV is: PENDING
[2024-09-27, 10:55:06 UTC] {ray.py:290} INFO - Deferring the polling to RayJobTrigger...
[2024-09-27, 10:55:06 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/ray_provider/operators/ray.py", line 302, in execute
    timeout=job_timeout_seconds,
            ^^^^^^^^^^^^^^^^^^^
UnboundLocalError: cannot access local variable 'job_timeout_seconds' where it is not associated with a value
During handling of the above exception, another exception occurred:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Decorator Improvements -- Dynamic config
4 participants