diff --git a/.gitignore b/.gitignore index f004f00..047ef89 100644 --- a/.gitignore +++ b/.gitignore @@ -142,3 +142,7 @@ dmypy.json # Cython debug symbols cython_debug/ + +# asro-cli +dev/include/* +dev/dags/config diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bbbe623 --- /dev/null +++ b/Makefile @@ -0,0 +1,27 @@ +.PHONY: help +help: + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' + +.PHONY: setup-dev +setup-dev: ## Setup development environment + python3 -m venv venv + . venv/bin/activate && pip install .[tests] + @echo "To activate the virtual environment, run:" + @echo "source venv/bin/activate" + +.PHONY: build-whl +build-whl: setup-dev ## Build installable whl file + cd dev + python3 -m build --outdir dev/include/ + +.PHONY: docker-run +docker-run: build-whl ## Runs local Airflow for testing + @if ! lsof -i :8080 | grep LISTEN > /dev/null; then \ + cd dev && astro dev start; \ + else \ + cd dev && astro dev restart; \ + fi + +.PHONY: docker-stop +docker-stop: ## Stop Docker container + cd dev && astro dev stop diff --git a/dev/.astro/config.yaml b/dev/.astro/config.yaml new file mode 100644 index 0000000..259b054 --- /dev/null +++ b/dev/.astro/config.yaml @@ -0,0 +1,2 @@ +project: + name: dev diff --git a/dev/.astro/dag_integrity_exceptions.txt b/dev/.astro/dag_integrity_exceptions.txt new file mode 100644 index 0000000..0d6bd89 --- /dev/null +++ b/dev/.astro/dag_integrity_exceptions.txt @@ -0,0 +1 @@ +# Add dag files to exempt from parse test below. ex: dags/ diff --git a/dev/.astro/test_dag_integrity_default.py b/dev/.astro/test_dag_integrity_default.py new file mode 100644 index 0000000..62dfd06 --- /dev/null +++ b/dev/.astro/test_dag_integrity_default.py @@ -0,0 +1,130 @@ +"""Test the validity of all DAGs. **USED BY DEV PARSE COMMAND DO NOT EDIT**""" + +import logging +import os +from contextlib import contextmanager + +import pytest +from airflow.hooks.base import BaseHook +from airflow.models import Connection, DagBag, Variable +from airflow.utils.db import initdb + +# init airflow database +initdb() + +# The following code patches errors caused by missing OS Variables, Airflow Connections, and Airflow Variables + + +# =========== MONKEYPATCH BaseHook.get_connection() =========== +def basehook_get_connection_monkeypatch(key: str, *args, **kwargs): + print(f"Attempted to fetch connection during parse returning an empty Connection object for {key}") + return Connection(key) + + +BaseHook.get_connection = basehook_get_connection_monkeypatch +# # =========== /MONKEYPATCH BASEHOOK.GET_CONNECTION() =========== + + +# =========== MONKEYPATCH OS.GETENV() =========== +def os_getenv_monkeypatch(key: str, *args, **kwargs): + default = None + if args: + default = args[0] # os.getenv should get at most 1 arg after the key + if kwargs: + default = kwargs.get("default", None) # and sometimes kwarg if people are using the sig + + env_value = os.environ.get(key, None) + + if env_value: + return env_value # if the env_value is set, return it + if key == "JENKINS_HOME" and default is None: # fix https://github.com/astronomer/astro-cli/issues/601 + return None + if default: + return default # otherwise return whatever default has been passed + return f"MOCKED_{key.upper()}_VALUE" # if absolutely nothing has been passed - return the mocked value + + +os.getenv = os_getenv_monkeypatch +# # =========== /MONKEYPATCH OS.GETENV() =========== + +# =========== MONKEYPATCH VARIABLE.GET() =========== + + +class magic_dict(dict): + def __init__(self, *args, **kwargs): + self.update(*args, **kwargs) + + def __getitem__(self, key): + return {}.get(key, "MOCKED_KEY_VALUE") + + +_no_default = object() # allow falsey defaults + + +def variable_get_monkeypatch(key: str, default_var=_no_default, deserialize_json=False): + print(f"Attempted to get Variable value during parse, returning a mocked value for {key}") + + if default_var is not _no_default: + return default_var + if deserialize_json: + return magic_dict() + return "NON_DEFAULT_MOCKED_VARIABLE_VALUE" + + +Variable.get = variable_get_monkeypatch +# # =========== /MONKEYPATCH VARIABLE.GET() =========== + + +@contextmanager +def suppress_logging(namespace): + """ + Suppress logging within a specific namespace to keep tests "clean" during build + """ + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag, and include DAGs without errors. + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # Initialize an empty list to store the tuples + result = [] + + # Iterate over the items in import_errors + for k, v in dag_bag.import_errors.items(): + result.append((strip_path_prefix(k), v.strip())) + + # Check if there are DAGs without errors + for file_path in dag_bag.dags: + # Check if the file_path is not in import_errors, meaning no errors + if file_path not in dag_bag.import_errors: + result.append((strip_path_prefix(file_path), "No import errors")) + + return result + + +@pytest.mark.parametrize("rel_path, rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if os.path.exists(".astro/dag_integrity_exceptions.txt"): + with open(".astro/dag_integrity_exceptions.txt") as f: + exceptions = f.readlines() + print(f"Exceptions: {exceptions}") + if (rv != "No import errors") and rel_path not in exceptions: + # If rv is not "No import errors," consider it a failed test + raise Exception(f"{rel_path} failed to import with message \n {rv}") + else: + # If rv is "No import errors," consider it a passed test + print(f"{rel_path} passed the import test") diff --git a/dev/.dockerignore b/dev/.dockerignore new file mode 100644 index 0000000..a334663 --- /dev/null +++ b/dev/.dockerignore @@ -0,0 +1,8 @@ +astro +.git +.env +airflow_settings.yaml +logs/ +.venv +airflow.db +airflow.cfg diff --git a/dev/.gitignore b/dev/.gitignore new file mode 100644 index 0000000..0e8bcca --- /dev/null +++ b/dev/.gitignore @@ -0,0 +1,11 @@ +.git +.env +.DS_Store +airflow_settings.yaml +__pycache__/ +astro +.venv +airflow-webserver.pid +webserver_config.py +airflow.cfg +airflow.db diff --git a/dev/Dockerfile b/dev/Dockerfile new file mode 100644 index 0000000..e26ed5e --- /dev/null +++ b/dev/Dockerfile @@ -0,0 +1,11 @@ +FROM quay.io/astronomer/astro-runtime:12.2.0 + +USER root + +RUN curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 && \ + chmod 700 get_helm.sh && \ + ./get_helm.sh + +USER astro + +RUN pip install /usr/local/airflow/include/*.whl diff --git a/dev/dags/.airflowignore b/dev/dags/.airflowignore new file mode 100644 index 0000000..e69de29 diff --git a/example_dags/ray_scripts/script-gpu.py b/dev/dags/ray_scripts/script-gpu.py similarity index 100% rename from example_dags/ray_scripts/script-gpu.py rename to dev/dags/ray_scripts/script-gpu.py diff --git a/example_dags/ray_scripts/script.py b/dev/dags/ray_scripts/script.py similarity index 100% rename from example_dags/ray_scripts/script.py rename to dev/dags/ray_scripts/script.py diff --git a/example_dags/ray_single_operator.py b/dev/dags/ray_single_operator.py similarity index 100% rename from example_dags/ray_single_operator.py rename to dev/dags/ray_single_operator.py diff --git a/example_dags/ray_taskflow_example.py b/dev/dags/ray_taskflow_example.py similarity index 100% rename from example_dags/ray_taskflow_example.py rename to dev/dags/ray_taskflow_example.py diff --git a/example_dags/ray_taskflow_example_existing_cluster.py b/dev/dags/ray_taskflow_example_existing_cluster.py similarity index 100% rename from example_dags/ray_taskflow_example_existing_cluster.py rename to dev/dags/ray_taskflow_example_existing_cluster.py diff --git a/example_dags/scripts/ray-gpu.yaml b/dev/dags/scripts/ray-gpu.yaml similarity index 100% rename from example_dags/scripts/ray-gpu.yaml rename to dev/dags/scripts/ray-gpu.yaml diff --git a/example_dags/scripts/ray.yaml b/dev/dags/scripts/ray.yaml similarity index 100% rename from example_dags/scripts/ray.yaml rename to dev/dags/scripts/ray.yaml diff --git a/example_dags/setup-teardown.py b/dev/dags/setup-teardown.py similarity index 100% rename from example_dags/setup-teardown.py rename to dev/dags/setup-teardown.py diff --git a/dev/packages.txt b/dev/packages.txt new file mode 100644 index 0000000..e69de29 diff --git a/dev/requirements.txt b/dev/requirements.txt new file mode 100644 index 0000000..1bb359b --- /dev/null +++ b/dev/requirements.txt @@ -0,0 +1 @@ +# Astro Runtime includes the following pre-installed providers packages: https://www.astronomer.io/docs/astro/runtime-image-architecture#provider-packages diff --git a/dev/tests/dags/test_dag_example.py b/dev/tests/dags/test_dag_example.py new file mode 100644 index 0000000..3336fbd --- /dev/null +++ b/dev/tests/dags/test_dag_example.py @@ -0,0 +1,74 @@ +"""Example DAGs test. This test ensures that all Dags have tags, retries set to two, and no import errors. This is an example pytest and may not be fit the context of your DAGs. Feel free to add and remove tests.""" + +import logging +import os +from contextlib import contextmanager + +import pytest +from airflow.models import DagBag + + +@contextmanager +def suppress_logging(namespace): + logger = logging.getLogger(namespace) + old_value = logger.disabled + logger.disabled = True + try: + yield + finally: + logger.disabled = old_value + + +def get_import_errors(): + """ + Generate a tuple for import errors in the dag bag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + # prepend "(None,None)" to ensure that a test object is always created even if it's a no op. + return [(None, None)] + [(strip_path_prefix(k), v.strip()) for k, v in dag_bag.import_errors.items()] + + +def get_dags(): + """ + Generate a tuple of dag_id, in the DagBag + """ + with suppress_logging("airflow"): + dag_bag = DagBag(include_examples=False) + + def strip_path_prefix(path): + return os.path.relpath(path, os.environ.get("AIRFLOW_HOME")) + + return [(k, v, strip_path_prefix(v.fileloc)) for k, v in dag_bag.dags.items()] + + +@pytest.mark.parametrize("rel_path,rv", get_import_errors(), ids=[x[0] for x in get_import_errors()]) +def test_file_imports(rel_path, rv): + """Test for import errors on a file""" + if rel_path and rv: + raise Exception(f"{rel_path} failed to import with message \n {rv}") + + +APPROVED_TAGS = {} + + +@pytest.mark.parametrize("dag_id,dag,fileloc", get_dags(), ids=[x[2] for x in get_dags()]) +def test_dag_tags(dag_id, dag, fileloc): + """ + test if a DAG is tagged and if those TAGs are in the approved list + """ + assert dag.tags, f"{dag_id} in {fileloc} has no tags" + if APPROVED_TAGS: + assert not set(dag.tags) - APPROVED_TAGS + + +@pytest.mark.parametrize("dag_id,dag, fileloc", get_dags(), ids=[x[2] for x in get_dags()]) +def test_dag_retries(dag_id, dag, fileloc): + """ + test if a DAG has retries set + """ + assert dag.default_args.get("retries", None) >= 2, f"{dag_id} in {fileloc} must have task retries >= 2." diff --git a/docs/CONTRIBUTING.rst b/docs/CONTRIBUTING.rst index fb70c17..d9a8a69 100644 --- a/docs/CONTRIBUTING.rst +++ b/docs/CONTRIBUTING.rst @@ -30,6 +30,12 @@ Pre-requisites pip install pytest + +Set up RayCluster and Apache Airflow +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For instructions on setting up RayCluster and Apache Airflow, please see the `Local Development Setup `_. + Run tests ~~~~~~~~~ diff --git a/docs/_static/basic_local_kubernetes_conn.png b/docs/_static/basic_local_kubernetes_conn.png new file mode 100644 index 0000000..b3e58c7 Binary files /dev/null and b/docs/_static/basic_local_kubernetes_conn.png differ diff --git a/docs/getting_started/code_samples.rst b/docs/getting_started/code_samples.rst index 622ae2f..91f415e 100644 --- a/docs/getting_started/code_samples.rst +++ b/docs/getting_started/code_samples.rst @@ -18,7 +18,7 @@ In the example below (``ray_taskflow_example_existing_cluster.py``), the ``@ray. .. important:: **Set the Ray Dashboard URL connection parameter or RAY_ADDRESS on your airflow worker to connect to your cluster** -.. literalinclude:: ../../example_dags/ray_taskflow_example_existing_cluster.py +.. literalinclude:: ../../dev/dags/ray_taskflow_example_existing_cluster.py :language: python :linenos: @@ -30,7 +30,7 @@ Ray Cluster Sample Spec (YAML) Save this file in a location accessible to your Airflow installation, and reference it in your DAG code. -.. literalinclude:: ../../example_dags/scripts/ray.yaml +.. literalinclude:: ../../dev/dags/scripts/ray.yaml :language: yaml @@ -41,7 +41,7 @@ The below example showcases how to use the ``@ray.task`` decorator to manage the This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion. -.. literalinclude:: ../../example_dags/ray_taskflow_example.py +.. literalinclude:: ../../dev/dags/ray_taskflow_example.py :language: python :linenos: @@ -53,7 +53,7 @@ This example demonstrates how to use the ``SubmitRayJob`` operator to manage the This operator provides a more declarative way to define your Ray job within an Airflow DAG. -.. literalinclude:: ../../example_dags/ray_single_operator.py +.. literalinclude:: ../../dev/dags/ray_single_operator.py :language: python :linenos: @@ -76,6 +76,6 @@ This method is ideal for scenarios where you need fine-grained control over the .. important:: **The SubmitRayJob operator uses the xcom_task_key parameter "SetupRayCluster.dashboard" to retrieve the Ray dashboard URL. This URL, stored as an XCom variable by the SetupRayCluster task, is necessary for job submission.** -.. literalinclude:: ../../example_dags/setup-teardown.py +.. literalinclude:: ../../dev/dags/setup-teardown.py :language: python :linenos: diff --git a/docs/getting_started/local_development_setup.rst b/docs/getting_started/local_development_setup.rst new file mode 100644 index 0000000..2352db1 --- /dev/null +++ b/docs/getting_started/local_development_setup.rst @@ -0,0 +1,122 @@ +Local Development Setup Guide +############################# + +This document describes the local setup for RayCluster and `Apache Airflow® `_. + +Table of Contents +================= + +- `Setup RayCluster`_ +- `Setup Apache Airflow®`_ + + +Setup RayCluster +================ + +This section describes how to set up RayCluster on Kind. For detailed instructions, please refer to the official guide: `RayCluster Quick Start `_. + +Prerequisites +------------- + +Install the following software: + +- `Docker `_ +- `Kubectl `_ +- `Kind `_ +- `Helm `_ + +1. **Create a Kind Cluster** + +.. code-block:: bash + + kind create cluster --image=kindest/node:v1.26.0 + +2. **Deploy a KubeRay Operator** + +.. code-block:: bash + + helm repo add kuberay https://ray-project.github.io/kuberay-helm/ + helm repo update + + # Install both CRDs and KubeRay operator v1.2.2. + helm install kuberay-operator kuberay/kuberay-operator --version 1.2.2 + + # Confirm that the operator is running in the namespace `default`. + kubectl get pods + # NAME READY STATUS RESTARTS AGE + # kuberay-operator-b498fcfdf-hsjvk 1/1 Running 0 29s + + +3. **Deploy a RayCluster Custom Resource** + +.. code-block:: bash + + # Deploy a sample RayCluster CR from the KubeRay Helm chart repo: + helm install raycluster kuberay/ray-cluster --version 1.2.2 --set 'image.tag=2.9.0-aarch64' + + # Once the RayCluster CR has been created, you can view it by running: + kubectl get rayclusters + # NAME DESIRED WORKERS AVAILABLE WORKERS CPUS MEMORY GPUS STATUS AGE + # raycluster-kuberay 1 1 2 3G 0 ready 99s + + # View the pods in the RayCluster named "raycluster-kuberay" + kubectl get pods --selector=ray.io/cluster=raycluster-kuberay + + # NAME READY STATUS RESTARTS AGE + # raycluster-kuberay-head-wvzh2 1/1 Running 0 XXs + # raycluster-kuberay-worker-workergroup-4dfsb 1/1 Running 0 XXs + +Wait for the pods to reach the ``Running`` state + +4. **Expose the Port on Host Machine** + +.. code-block:: bash + + kubectl get service raycluster-kuberay-head-svc + + # NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE + # raycluster-kuberay-head-svc ClusterIP 10.96.1.92 8265/TCP,8080/TCP,8000/TCP,10001/TCP,6379/TCP 25m + + # Execute this in a separate shell. + kubectl port-forward service/raycluster-kuberay-head-svc 8265:8265 + +5. Access the Ray Dashboard + +Visit http://127.0.0.1:8265 in your browser + +Setup Apache Airflow® +===================== + +This section describes how to set up `Apache Airflow® `_ using Astro CLI. For detailed instructions, please refer to the official guide: `Astro CLI Quick Start `_. + +Prerequisites +------------- + +- `Docker `_ +- `Astro CLI `_ + +We have a `Makefile `_ that wraps the Astro CLI. It installs the necessary packages into your image to run the DAG locally. + +1. **Start Airflow Instance** + +.. code-block:: bash + + make docker-run + +To see other available Makefile targets, please run ``make help``. + +2. **Create Airflow Connection** + +- Visit http://localhost:8080/ in your browser. + +- Log in with username: admin and password: admin. + +- Click on Admin -> Connections -> Add a new record. Select Connection type ``Ray`` + +The most basic setup will look something like below: + +- Ray dashboard url: Kind Ray cluster dashboard url +- Kube config path: Provide the path to your Kubernetes config file and ensure it is accessible from the Airflow containers. +- Disable SSL: Tick the disable SSL boolean if needed + +.. image:: ../_static/basic_local_kubernetes_conn.png diff --git a/docs/getting_started/setup.rst b/docs/getting_started/setup.rst index 4aea291..75bfc8a 100644 --- a/docs/getting_started/setup.rst +++ b/docs/getting_started/setup.rst @@ -28,7 +28,7 @@ See the `installing Helm `_ page for other Amazon Web Services (AWS) -**3. Setting up the Airflow connection** +**4. Setting up the Airflow connection** - Setup/Teardown a Ray cluster on Kubernetes diff --git a/example_dags/dags b/example_dags/dags new file mode 120000 index 0000000..314da8b --- /dev/null +++ b/example_dags/dags @@ -0,0 +1 @@ +../dev/dags \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 005fac4..3a41512 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,11 @@ dependencies = [ "apache-airflow-providers-cncf-kubernetes" ] +[project.optional-dependencies] +tests = [ + "build", +] + [project.urls] Homepage = "https://astronomer.io" Source = "https://github.com/astronomer/astro-provider-ray/" @@ -68,6 +73,7 @@ docs = [ [tool.hatch.envs.tests] dependencies = [ + "astro-provider-ray[tests]", "types-PyYAML", "types-attrs", "types-requests",