From e76ccee69aaa7583be1a9d81cf7b2aa72cf25647 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 20 Dec 2022 17:16:22 +0800 Subject: [PATCH] Ray on spark implementation (#28771) REP: ray-project/enhancements#14 --- .buildkite/pipeline.build.yml | 14 + .../vms/user-guides/community/index.rst | 1 + .../vms/user-guides/community/spark.rst | 92 ++ python/ray/tests/BUILD | 16 + python/ray/tests/spark/discover_2_gpu.sh | 4 + python/ray/tests/spark/discover_4_gpu.sh | 4 + python/ray/tests/spark/test_GPU.py | 87 ++ python/ray/tests/spark/test_basic.py | 157 ++++ .../tests/spark/test_multicores_per_task.py | 45 + python/ray/tests/spark/test_utils.py | 36 + python/ray/util/spark/__init__.py | 7 + python/ray/util/spark/cluster_init.py | 822 ++++++++++++++++++ python/ray/util/spark/databricks_hook.py | 74 ++ python/ray/util/spark/start_hook_base.py | 9 + python/ray/util/spark/start_ray_node.py | 125 +++ python/ray/util/spark/utils.py | 263 ++++++ python/requirements_test.txt | 2 +- 17 files changed, 1757 insertions(+), 1 deletion(-) create mode 100644 doc/source/cluster/vms/user-guides/community/spark.rst create mode 100755 python/ray/tests/spark/discover_2_gpu.sh create mode 100755 python/ray/tests/spark/discover_4_gpu.sh create mode 100644 python/ray/tests/spark/test_GPU.py create mode 100644 python/ray/tests/spark/test_basic.py create mode 100644 python/ray/tests/spark/test_multicores_per_task.py create mode 100644 python/ray/tests/spark/test_utils.py create mode 100644 python/ray/util/spark/__init__.py create mode 100644 python/ray/util/spark/cluster_init.py create mode 100644 python/ray/util/spark/databricks_hook.py create mode 100644 python/ray/util/spark/start_hook_base.py create mode 100644 python/ray/util/spark/start_ray_node.py create mode 100644 python/ray/util/spark/utils.py diff --git a/.buildkite/pipeline.build.yml b/.buildkite/pipeline.build.yml index c51cc2c27655..924b1592aa6e 100644 --- a/.buildkite/pipeline.build.yml +++ b/.buildkite/pipeline.build.yml @@ -484,6 +484,20 @@ --test_env=CONDA_DEFAULT_ENV python/ray/tests/... +- label: ":python: Ray on Spark Test" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + instance_size: medium + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - pip uninstall -y ray + - RAY_DEBUG_BUILD=debug ./ci/ci.sh build + - ./ci/env/env_info.sh + - bazel test --config=ci-debug $(./ci/run/bazel_export_options) + --test_env=RAY_ON_SPARK_BACKGROUND_JOB_STARTUP_WAIT=1 + --test_env=RAY_ON_SPARK_RAY_WORKER_NODE_STARTUP_INTERVAL=5 + --test_tag_filters=-kubernetes,spark_plugin_tests + python/ray/tests/... + # https://github.com/ray-project/ray/issues/22460 #- label: ":python: (Privileged test)" #conditions: ["RAY_CI_PYTHON_AFFECTED"] diff --git a/doc/source/cluster/vms/user-guides/community/index.rst b/doc/source/cluster/vms/user-guides/community/index.rst index d3f354b39b59..5f72eedd6db9 100644 --- a/doc/source/cluster/vms/user-guides/community/index.rst +++ b/doc/source/cluster/vms/user-guides/community/index.rst @@ -15,6 +15,7 @@ The following is a list of community supported cluster managers. yarn.rst slurm.rst lsf.rst + spark.rst .. _ref-additional-cloud-providers: diff --git a/doc/source/cluster/vms/user-guides/community/spark.rst b/doc/source/cluster/vms/user-guides/community/spark.rst new file mode 100644 index 000000000000..9192afed1ace --- /dev/null +++ b/doc/source/cluster/vms/user-guides/community/spark.rst @@ -0,0 +1,92 @@ +.. _ray-Spark-deploy: + +Deploying on Spark Standalone cluster +===================================== + +This document describes a couple high-level steps to run Ray clusters on `Spark Standalone cluster `_. + +Running a basic example +----------------------- + +This is a spark application example code that starts Ray cluster on spark, +and then execute ray application code, then shut down initiated ray cluster. + +1) Create a python file that contains a spark application code, +Assuming the python file name is 'ray-on-spark-example1.py'. + +.. code-block:: python + + from pyspark.sql import SparkSession + from ray.util.spark import init_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES + if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("Ray on spark example 1") \ + .config("spark.task.cpus", "4") \ + .getOrCreate() + + # initiate a ray cluster on this spark application, it creates a background + # spark job that each spark task launches one ray worker node. + # ray head node is launched in spark application driver side. + # Resources (CPU / GPU / memory) allocated to each ray worker node is equal + # to resources allocated to the corresponding spark task. + init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES) + + # You can any ray application code here, the ray application will be executed + # on the ray cluster setup above. + # Note that you don't need to call `ray.init`. + ... + + # Terminate ray cluster explicitly. + # If you don't call it, when spark application is terminated, the ray cluster will + # also be terminated. + shutdown_ray_cluster() + +2) Submit the spark application above to spark standalone cluster. + +.. code-block:: bash + + #!/bin/bash + spark-submit \ + --master spark://{spark_master_IP}:{spark_master_port} \ + path/to/ray-on-spark-example1.py + +Creating a long running ray cluster on spark cluster +---------------------------------------------------- + +This is a spark application example code that starts a long running Ray cluster on spark. +The created ray cluster can be accessed by remote python processes. + +1) Create a python file that contains a spark application code, +Assuming the python file name is 'long-running-ray-cluster-on-spark.py'. + +.. code-block:: python + + from pyspark.sql import SparkSession + import time + from ray.util.spark import init_ray_cluster, MAX_NUM_WORKER_NODES + + if __name__ == "__main__": + spark = SparkSession \ + .builder \ + .appName("long running ray cluster on spark") \ + .config("spark.task.cpus", "4") \ + .getOrCreate() + + cluster_address = init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES) + print("Ray cluster is initiated, you can connect to this ray cluster via address " + f"ray://{cluster_address}") + + # Sleep forever until the spark application being terminated, + # at that time, the ray cluster will also be terminated. + while True: + time.sleep(10) + +2) Submit the spark application above to spark standalone cluster. + +.. code-block:: bash + + #!/bin/bash + spark-submit \ + --master spark://{spark_master_IP}:{spark_master_port} \ + path/to/long-running-ray-cluster-on-spark.py diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index fc5d2f6f08dc..4fef7a7061ae 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -518,3 +518,19 @@ py_test_module_list( tags = ["exclusive", "asan_tests", "team:core"], deps = ["//:ray_lib", ":conftest"], ) + +py_test_module_list( + files = [ + "spark/test_basic.py", + "spark/test_GPU.py", + "spark/test_multicores_per_task.py", + "spark/test_utils.py", + ], + size = "large", + tags = ["exclusive", "spark_plugin_tests", "team:serverless"], + deps = ["//:ray_lib", ":conftest"], + data = [ + "spark/discover_2_gpu.sh", + "spark/discover_4_gpu.sh" + ], +) diff --git a/python/ray/tests/spark/discover_2_gpu.sh b/python/ray/tests/spark/discover_2_gpu.sh new file mode 100755 index 000000000000..6491bacaecb1 --- /dev/null +++ b/python/ray/tests/spark/discover_2_gpu.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# This script is used in spark GPU cluster config for discovering available GPU. +echo "{\"name\":\"gpu\",\"addresses\":[\"0\",\"1\"]}" diff --git a/python/ray/tests/spark/discover_4_gpu.sh b/python/ray/tests/spark/discover_4_gpu.sh new file mode 100755 index 000000000000..3a111e1f577b --- /dev/null +++ b/python/ray/tests/spark/discover_4_gpu.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +# This script is used in spark GPU cluster config for discovering available GPU. +echo "{\"name\":\"gpu\",\"addresses\":[\"0\",\"1\",\"2\",\"3\"]}" diff --git a/python/ray/tests/spark/test_GPU.py b/python/ray/tests/spark/test_GPU.py new file mode 100644 index 000000000000..8de58bc2e2ec --- /dev/null +++ b/python/ray/tests/spark/test_GPU.py @@ -0,0 +1,87 @@ +import sys +import pytest +import os +import time +import functools +from abc import ABC +from pyspark.sql import SparkSession +from ray.tests.spark.test_basic import RayOnSparkCPUClusterTestBase + +import ray +from ray.util.spark.cluster_init import _init_ray_cluster + +pytestmark = pytest.mark.skipif( + not sys.platform.startswith("linux"), + reason="Ray on spark only supports running on Linux.", +) + + +class RayOnSparkGPUClusterTestBase(RayOnSparkCPUClusterTestBase, ABC): + + num_total_gpus = None + num_gpus_per_spark_task = None + + def test_gpu_allocation(self): + + for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]: + with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False): + worker_res_list = self.get_ray_worker_resources_list() + assert len(worker_res_list) == num_spark_tasks + for worker_res in worker_res_list: + assert worker_res["GPU"] == self.num_gpus_per_spark_task + + def test_basic_ray_app_using_gpu(self): + + with _init_ray_cluster(num_worker_nodes=self.max_spark_tasks, safe_mode=False): + + @ray.remote(num_cpus=1, num_gpus=1) + def f(_): + # Add a sleep to avoid the task finishing too fast, + # so that it can make all ray tasks concurrently running in all idle + # task slots. + time.sleep(5) + return [ + int(gpu_id) + for gpu_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",") + ] + + futures = [f.remote(i) for i in range(self.num_total_gpus)] + results = ray.get(futures) + merged_results = functools.reduce(lambda x, y: x + y, results) + # Test all ray tasks are assigned with different GPUs. + assert sorted(merged_results) == list(range(self.num_total_gpus)) + + +class TestBasicSparkGPUCluster(RayOnSparkGPUClusterTestBase): + @classmethod + def setup_class(cls): + super().setup_class() + cls.num_total_cpus = 2 + cls.num_total_gpus = 2 + cls.num_cpus_per_spark_task = 1 + cls.num_gpus_per_spark_task = 1 + cls.max_spark_tasks = 2 + gpu_discovery_script_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "discover_2_gpu.sh" + ) + os.environ["SPARK_WORKER_CORES"] = "4" + cls.spark = ( + SparkSession.builder.master("local-cluster[1, 2, 1024]") + .config("spark.task.cpus", "1") + .config("spark.task.resource.gpu.amount", "1") + .config("spark.executor.cores", "2") + .config("spark.worker.resource.gpu.amount", "2") + .config("spark.executor.resource.gpu.amount", "2") + .config("spark.task.maxFailures", "1") + .config( + "spark.worker.resource.gpu.discoveryScript", gpu_discovery_script_path + ) + .getOrCreate() + ) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/spark/test_basic.py b/python/ray/tests/spark/test_basic.py new file mode 100644 index 000000000000..9b3d9c6a6e13 --- /dev/null +++ b/python/ray/tests/spark/test_basic.py @@ -0,0 +1,157 @@ +import os +import shutil +import tempfile +import socket +import pytest +import sys + +from abc import ABC + +import ray + +import ray.util.spark.cluster_init +from ray.util.spark import init_ray_cluster, shutdown_ray_cluster +from ray.util.spark.cluster_init import _init_ray_cluster +from ray.util.spark.utils import check_port_open +from pyspark.sql import SparkSession +import time +import logging + +pytestmark = pytest.mark.skipif( + not sys.platform.startswith("linux"), + reason="Ray on spark only supports running on Linux.", +) + +_logger = logging.getLogger(__name__) + + +class RayOnSparkCPUClusterTestBase(ABC): + + spark = None + num_total_cpus = None + num_cpus_per_spark_task = None + max_spark_tasks = None + + @classmethod + def setup_class(cls): + pass + + @classmethod + def teardown_class(cls): + time.sleep(10) # Wait all background spark job canceled. + cls.spark.stop() + + @staticmethod + def get_ray_worker_resources_list(): + wr_list = [] + for node in ray.nodes(): + # exclude dead node and head node (with 0 CPU resource) + if node["Alive"] and node["Resources"].get("CPU", 0) > 0: + wr_list.append(node["Resources"]) + return wr_list + + def test_cpu_allocation(self): + for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]: + with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False): + worker_res_list = self.get_ray_worker_resources_list() + assert len(worker_res_list) == num_spark_tasks + for worker_res in worker_res_list: + assert worker_res["CPU"] == self.num_cpus_per_spark_task + + def test_public_api(self): + try: + ray_temp_root_dir = tempfile.mkdtemp() + collect_log_to_path = tempfile.mkdtemp() + init_ray_cluster( + num_worker_nodes=self.max_spark_tasks, + safe_mode=False, + collect_log_to_path=collect_log_to_path, + ray_temp_root_dir=ray_temp_root_dir, + ) + + @ray.remote + def f(x): + return x * x + + futures = [f.remote(i) for i in range(32)] + results = ray.get(futures) + assert results == [i * i for i in range(32)] + + shutdown_ray_cluster() + + time.sleep(7) + # assert temp dir is removed. + assert len(os.listdir(ray_temp_root_dir)) == 1 and os.listdir( + ray_temp_root_dir + )[0].endswith(".lock") + + # assert logs are copied to specified path + listed_items = os.listdir(collect_log_to_path) + assert len(listed_items) == 1 and listed_items[0].startswith("ray-") + log_dest_dir = os.path.join( + collect_log_to_path, listed_items[0], socket.gethostname() + ) + assert os.path.exists(log_dest_dir) and len(os.listdir(log_dest_dir)) > 0 + finally: + if ray.util.spark.cluster_init._active_ray_cluster is not None: + # if the test raised error and does not destroy cluster, + # destroy it here. + ray.util.spark._active_ray_cluster.shutdown() + time.sleep(5) + shutil.rmtree(ray_temp_root_dir, ignore_errors=True) + shutil.rmtree(collect_log_to_path, ignore_errors=True) + + def test_ray_cluster_shutdown(self): + with _init_ray_cluster( + num_worker_nodes=self.max_spark_tasks, safe_mode=False + ) as cluster: + assert len(self.get_ray_worker_resources_list()) == self.max_spark_tasks + + # Test: cancel background spark job will cause all ray worker nodes exit. + cluster._cancel_background_spark_job() + time.sleep(8) + + assert len(self.get_ray_worker_resources_list()) == 0 + + time.sleep(2) # wait ray head node exit. + # assert ray head node exit by checking head port being closed. + hostname, port = cluster.address.split(":") + assert not check_port_open(hostname, int(port)) + + def test_background_spark_job_exit_trigger_ray_head_exit(self): + with _init_ray_cluster( + num_worker_nodes=self.max_spark_tasks, safe_mode=False + ) as cluster: + # Mimic the case the job failed unexpectedly. + cluster._cancel_background_spark_job() + cluster.spark_job_is_canceled = False + time.sleep(5) + + # assert ray head node exit by checking head port being closed. + hostname, port = cluster.address.split(":") + assert not check_port_open(hostname, int(port)) + + +class TestBasicSparkCluster(RayOnSparkCPUClusterTestBase): + @classmethod + def setup_class(cls): + super().setup_class() + cls.num_total_cpus = 2 + cls.num_total_gpus = 0 + cls.num_cpus_per_spark_task = 1 + cls.num_gpus_per_spark_task = 0 + cls.max_spark_tasks = 2 + os.environ["SPARK_WORKER_CORES"] = "2" + cls.spark = ( + SparkSession.builder.master("local-cluster[1, 2, 1024]") + .config("spark.task.cpus", "1") + .config("spark.task.maxFailures", "1") + .getOrCreate() + ) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/spark/test_multicores_per_task.py b/python/ray/tests/spark/test_multicores_per_task.py new file mode 100644 index 000000000000..95bdee432c89 --- /dev/null +++ b/python/ray/tests/spark/test_multicores_per_task.py @@ -0,0 +1,45 @@ +import sys +import pytest +import os +from pyspark.sql import SparkSession +from ray.tests.spark.test_GPU import RayOnSparkGPUClusterTestBase + +pytestmark = pytest.mark.skipif( + not sys.platform.startswith("linux"), + reason="Ray on spark only supports running on Linux.", +) + + +class TestMultiCoresPerTaskCluster(RayOnSparkGPUClusterTestBase): + @classmethod + def setup_class(cls): + super().setup_class() + cls.num_total_cpus = 4 + cls.num_total_gpus = 4 + cls.num_cpus_per_spark_task = 2 + cls.num_gpus_per_spark_task = 2 + cls.max_spark_tasks = 2 + gpu_discovery_script_path = os.path.join( + os.path.dirname(os.path.abspath(__file__)), "discover_4_gpu.sh" + ) + os.environ["SPARK_WORKER_CORES"] = "4" + cls.spark = ( + SparkSession.builder.master("local-cluster[1, 4, 1024]") + .config("spark.task.cpus", "2") + .config("spark.task.resource.gpu.amount", "2") + .config("spark.executor.cores", "4") + .config("spark.worker.resource.gpu.amount", "4") + .config("spark.executor.resource.gpu.amount", "4") + .config("spark.task.maxFailures", "1") + .config( + "spark.worker.resource.gpu.discoveryScript", gpu_discovery_script_path + ) + .getOrCreate() + ) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/spark/test_utils.py b/python/ray/tests/spark/test_utils.py new file mode 100644 index 000000000000..fa7fe1b93942 --- /dev/null +++ b/python/ray/tests/spark/test_utils.py @@ -0,0 +1,36 @@ +from unittest.mock import patch +import os +import sys + +import pytest +from ray.util.spark.utils import ( + get_spark_task_assigned_physical_gpus, + _calc_mem_per_ray_worker_node, +) + +pytestmark = pytest.mark.skipif( + not sys.platform.startswith("linux"), + reason="Ray on spark only supports running on Linux.", +) + + +def test_get_spark_task_assigned_physical_gpus(): + with patch.dict(os.environ, {}, clear=True): + assert get_spark_task_assigned_physical_gpus([2, 5]) == [2, 5] + + with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "2,3,6"}, clear=True): + assert get_spark_task_assigned_physical_gpus([0, 1]) == [2, 3] + assert get_spark_task_assigned_physical_gpus([0, 2]) == [2, 6] + + +def test_calc_mem_per_ray_worker_node(): + assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, 100000) == (120000, 80000) + assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, 70000) == (130000, 70000) + assert _calc_mem_per_ray_worker_node(4, 1000000, 400000, None) == (120000, 80000) + + +if __name__ == "__main__": + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/util/spark/__init__.py b/python/ray/util/spark/__init__.py new file mode 100644 index 000000000000..ddfd2de835bc --- /dev/null +++ b/python/ray/util/spark/__init__.py @@ -0,0 +1,7 @@ +from ray.util.spark.cluster_init import ( + init_ray_cluster, + shutdown_ray_cluster, + MAX_NUM_WORKER_NODES, +) + +__all__ = ["init_ray_cluster", "shutdown_ray_cluster", "MAX_NUM_WORKER_NODES"] diff --git a/python/ray/util/spark/cluster_init.py b/python/ray/util/spark/cluster_init.py new file mode 100644 index 000000000000..08fdb5c58243 --- /dev/null +++ b/python/ray/util/spark/cluster_init.py @@ -0,0 +1,822 @@ +import os +import socket +import sys +import time +import threading +import logging +import uuid +from packaging.version import Version +from typing import Optional, Dict + +import ray +from ray.util.annotations import PublicAPI +from ray._private.storage import _load_class + +from .utils import ( + exec_cmd, + check_port_open, + get_random_unused_port, + get_spark_session, + get_spark_application_driver_host, + is_in_databricks_runtime, + get_spark_task_assigned_physical_gpus, + get_avail_mem_per_ray_worker_node, + get_max_num_concurrent_tasks, + gen_cmd_exec_failure_msg, + setup_sigterm_on_parent_death, +) +from .start_hook_base import RayOnSparkStartHook +from .databricks_hook import DefaultDatabricksRayOnSparkStartHook + + +_logger = logging.getLogger("ray.util.spark") +_logger.setLevel(logging.INFO) + + +RAY_ON_SPARK_START_HOOK = "RAY_ON_SPARK_START_HOOK" + +MAX_NUM_WORKER_NODES = -1 + +RAY_ON_SPARK_COLLECT_LOG_TO_PATH = "RAY_ON_SPARK_COLLECT_LOG_TO_PATH" + + +def _check_system_environment(): + if not sys.platform.startswith("linux"): + raise RuntimeError("Ray on spark only supports running on Linux.") + + spark_dependency_error = "ray.util.spark module requires pyspark >= 3.3" + try: + import pyspark + + if Version(pyspark.__version__) < Version("3.3"): + raise RuntimeError(spark_dependency_error) + except ImportError: + raise RuntimeError(spark_dependency_error) + + +class RayClusterOnSpark: + """ + This class is the type of instance returned by the `init_ray_cluster` API. + Its main functionality is to: + Connect to, disconnect from, and shutdown the Ray cluster running on Apache Spark. + Serve as a Python context manager for the `RayClusterOnSpark` instance. + + Args + address: The url for the ray head node (defined as the hostname and unused + port on Spark driver node) + head_proc: Ray head process + spark_job_group_id: The Spark job id for a submitted ray job + num_workers_node: The number of workers in the ray cluster. + """ + + def __init__( + self, + address, + head_proc, + spark_job_group_id, + num_workers_node, + temp_dir, + ): + self.address = address + self.head_proc = head_proc + self.spark_job_group_id = spark_job_group_id + self.num_worker_nodes = num_workers_node + self.temp_dir = temp_dir + + self.ray_context = None + self.is_shutdown = False + self.spark_job_is_canceled = False + self.background_job_exception = None + + def _cancel_background_spark_job(self): + self.spark_job_is_canceled = True + get_spark_session().sparkContext.cancelJobGroup(self.spark_job_group_id) + + def connect(self): + import ray + + if self.background_job_exception is not None: + raise RuntimeError( + "Ray workers has exited." + ) from self.background_job_exception + + if self.is_shutdown: + raise RuntimeError( + "The ray cluster has been shut down or it failed to start." + ) + if self.ray_context is None: + try: + # connect to the ray cluster. + self.ray_context = ray.init(address=self.address) + except Exception: + self.shutdown() + raise + + last_alive_worker_count = 0 + last_progress_move_time = time.time() + while True: + time.sleep(_RAY_CLUSTER_STARTUP_PROGRESS_CHECKING_INTERVAL) + cur_alive_worker_count = ( + len([node for node in ray.nodes() if node["Alive"]]) - 1 + ) # Minus 1 means excluding the head node. + + if cur_alive_worker_count >= self.num_worker_nodes: + return + + if cur_alive_worker_count > last_alive_worker_count: + last_alive_worker_count = cur_alive_worker_count + last_progress_move_time = time.time() + _logger.info( + "Ray worker nodes are starting. Progress: " + f"({cur_alive_worker_count} / {self.num_worker_nodes})" + ) + else: + if ( + time.time() - last_progress_move_time + > _RAY_CONNECT_CLUSTER_POLL_PROGRESS_TIMEOUT + ): + _logger.warning( + "Timeout in waiting for all ray workers to start. " + "Started / Total requested: " + f"({cur_alive_worker_count} / {self.num_worker_nodes}). " + "Please check ray logs to see why some ray workers " + "failed to start." + ) + return + else: + _logger.warning("Already connected to this ray cluster.") + + def disconnect(self): + if self.ray_context is not None: + try: + self.ray_context.disconnect() + except Exception as e: + # swallow exception. + _logger.warning( + f"An error occurred while disconnecting from the ray cluster: " + f"{repr(e)}" + ) + self.ray_context = None + else: + _logger.warning("Already disconnected from this ray cluster.") + + def shutdown(self, cancel_background_job=True): + """ + Shutdown the ray cluster created by the `init_ray_cluster` API. + NB: In the background thread that runs the background spark job, if spark job + raise unexpected error, its exception handler will also call this method, in + the case, it will set cancel_background_job=False to avoid recursive call. + """ + if not self.is_shutdown: + if self.ray_context is not None: + self.disconnect() + if cancel_background_job: + try: + self._cancel_background_spark_job() + except Exception as e: + # swallow exception. + _logger.warning( + f"An error occurred while cancelling the ray cluster " + f"background spark job: {repr(e)}" + ) + try: + self.head_proc.terminate() + except Exception as e: + # swallow exception. + _logger.warning( + "An Error occurred during shutdown of ray head node: " f"{repr(e)}" + ) + self.is_shutdown = True + + def __enter__(self): + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown() + + +def _convert_ray_node_options(options): + return [f"--{k.replace('_', '-')}={str(v)}" for k, v in options.items()] + + +_RAY_HEAD_STARTUP_TIMEOUT = 5 +_BACKGROUND_JOB_STARTUP_WAIT = int( + os.environ.get("RAY_ON_SPARK_BACKGROUND_JOB_STARTUP_WAIT", "30") +) +_RAY_CLUSTER_STARTUP_PROGRESS_CHECKING_INTERVAL = 3 +_RAY_WORKER_NODE_STARTUP_INTERVAL = int( + os.environ.get("RAY_ON_SPARK_RAY_WORKER_NODE_STARTUP_INTERVAL", "10") +) +_RAY_CONNECT_CLUSTER_POLL_PROGRESS_TIMEOUT = 120 + + +def _prepare_for_ray_worker_node_startup(): + """ + If we start multiple ray workers on a machine concurrently, some ray worker + processes might fail due to ray port conflicts, this is because race condition + on getting free port and opening the free port. + To address the issue, this function use an exclusive file lock to delay the + worker processes to ensure that port acquisition does not create a resource + contention issue due to a race condition. + + After acquiring lock, it will allocate port range for worker ports + (for ray node config --min-worker-port and --max-worker-port). + Because on a spark cluster, multiple ray cluster might be created, so on one spark + worker machine, there might be multiple ray worker nodes running, these worker + nodes might belong to different ray cluster, and we must ensure these ray nodes on + the same machine using non-overlapping worker port range, to achieve this, in this + function, it creates a file `/tmp/ray_on_spark_worker_port_allocation.txt` file, + the file format is composed of multiple lines, each line contains 2 number: `pid` + and `port_range_slot_index`, each port range slot allocates 1000 ports, and + corresponding port range is: + - range_begin (inclusive): 20000 + port_range_slot_index * 1000 + - range_end (exclusive): range_begin + 1000 + In this function, it first scans `/tmp/ray_on_spark_worker_port_allocation.txt` + file, removing lines that containing dead process pid, then find the first unused + port_range_slot_index, then regenerate this file, and return the allocated port + range. + + Returns: Allocated port range for current worker ports + """ + import psutil + import fcntl + + def acquire_lock(file_path): + mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC + try: + fd = os.open(file_path, mode) + # The lock file must be readable / writable to all users. + os.chmod(file_path, 0o0777) + # Allow for retrying getting a file lock a maximum number of seconds + max_lock_iter = 600 + for _ in range(max_lock_iter): + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + # Lock is used by other processes, continue loop to wait for lock + # available + pass + else: + # Acquire lock successfully. + return fd + time.sleep(10) + raise TimeoutError(f"Acquiring lock on file {file_path} timeout.") + except Exception: + os.close(fd) + + lock_file_path = "/tmp/ray_on_spark_worker_startup_barrier_lock.lock" + try: + lock_fd = acquire_lock(lock_file_path) + except TimeoutError: + # If timeout happens, the file lock might be hold by another process and that + # process does not release the lock in time by some unexpected reason. + # In this case, remove the existing lock file and create the file again, and + # then acquire file lock on the new file. + try: + os.remove(lock_file_path) + except Exception: + pass + lock_fd = acquire_lock(lock_file_path) + + def release_lock(): + fcntl.flock(lock_fd, fcntl.LOCK_UN) + os.close(lock_fd) + + try: + port_alloc_file = "/tmp/ray_on_spark_worker_port_allocation.txt" + + # NB: reading / writing `port_alloc_file` is protected by exclusive lock + # on file `lock_file_path` + if os.path.exists(port_alloc_file): + with open(port_alloc_file, mode="r") as fp: + port_alloc_data = fp.read() + port_alloc_table = [ + line.split(" ") for line in port_alloc_data.strip().split("\n") + ] + port_alloc_table = [ + (int(pid_str), int(slot_index_str)) + for pid_str, slot_index_str in port_alloc_table + ] + else: + port_alloc_table = [] + with open(port_alloc_file, mode="w"): + pass + # The port range allocation file must be readable / writable to all users. + os.chmod(port_alloc_file, 0o0777) + + port_alloc_map = { + pid: slot_index + for pid, slot_index in port_alloc_table + if psutil.pid_exists(pid) # remove slot used by dead process + } + + allocated_slot_set = set(port_alloc_map.values()) + + if len(allocated_slot_set) == 0: + new_slot_index = 0 + else: + new_slot_index = max(allocated_slot_set) + 1 + for index in range(new_slot_index): + if index not in allocated_slot_set: + new_slot_index = index + break + + port_alloc_map[os.getpid()] = new_slot_index + + with open(port_alloc_file, mode="w") as fp: + for pid, slot_index in port_alloc_map.items(): + fp.write(f"{pid} {slot_index}\n") + + worker_port_range_begin = 20000 + new_slot_index * 1000 + worker_port_range_end = worker_port_range_begin + 1000 + + if worker_port_range_end > 65536: + raise RuntimeError( + "Too many ray worker nodes are running on this machine, cannot " + "allocate worker port range for new ray worker node." + ) + except Exception: + release_lock() + raise + + def hold_lock(): + time.sleep(_RAY_WORKER_NODE_STARTUP_INTERVAL) + release_lock() + + threading.Thread(target=hold_lock, args=()).start() + + return worker_port_range_begin, worker_port_range_end + + +def _init_ray_cluster( + num_worker_nodes, + object_store_memory_per_node=None, + head_options=None, + worker_options=None, + ray_temp_root_dir=None, + safe_mode=False, + collect_log_to_path=None, +): + """ + This function is used in testing, it has the same arguments with + `ray.util.spark.init_ray_cluster` API, but it returns a `RayClusterOnSpark` + instance instead. + + The returned instance can be used to connect to, disconnect from and shutdown the + ray cluster. This instance can also be used as a context manager (used by + encapsulating operations within `with init_ray_cluster(...):`). Upon entering the + managed scope, the ray cluster is initiated and connected to. When exiting the + scope, the ray cluster is disconnected and shut down. + """ + from pyspark.util import inheritable_thread_target + + if RAY_ON_SPARK_START_HOOK in os.environ: + start_hook = _load_class(os.environ[RAY_ON_SPARK_START_HOOK])() + elif is_in_databricks_runtime(): + start_hook = DefaultDatabricksRayOnSparkStartHook() + else: + start_hook = RayOnSparkStartHook() + + head_options = head_options or {} + worker_options = worker_options or {} + + spark = get_spark_session() + + # Environment configurations within the Spark Session that dictate how many cpus + # and gpus to use for each submitted spark task. + num_spark_task_cpus = int(spark.sparkContext.getConf().get("spark.task.cpus", "1")) + num_spark_task_gpus = int( + spark.sparkContext.getConf().get("spark.task.resource.gpu.amount", "0") + ) + + ( + ray_worker_node_heap_mem_bytes, + ray_worker_node_object_store_mem_bytes, + ) = get_avail_mem_per_ray_worker_node(spark, object_store_memory_per_node) + + max_concurrent_tasks = get_max_num_concurrent_tasks(spark.sparkContext) + if num_worker_nodes == -1: + # num_worker_nodes=-1 represents using all available spark task slots + num_worker_nodes = max_concurrent_tasks + elif num_worker_nodes <= 0: + raise ValueError( + "The value of 'num_worker_nodes' argument must be either a positive " + "integer or 'ray.util.spark.MAX_NUM_WORKER_NODES'." + ) + + insufficient_resources = [] + + if num_spark_task_cpus < 4: + insufficient_resources.append( + "The provided CPU resources for each ray worker are inadequate to start " + "a ray cluster. Based on the total cpu resources available and the " + "configured task sizing, each ray worker would start with " + f"{num_spark_task_cpus} CPU cores. This is less than the recommended " + "value of `4` CPUs per worker. Increasing the spark configuration " + "'spark.task.cpus' to a minimum of `4` addresses it." + ) + + if ray_worker_node_heap_mem_bytes < 10 * 1024 * 1024 * 1024: + insufficient_resources.append( + "The provided memory resources for each ray worker are inadequate. Based " + "on the total memory available on the spark cluster and the configured " + "task sizing, each ray worker would start with " + f"{ray_worker_node_heap_mem_bytes} bytes heap memory. This is less than " + "the recommended value of 10GB. The ray worker node heap memory size is " + "calculated by " + "(SPARK_WORKER_NODE_PHYSICAL_MEMORY / num_local_spark_task_slots * 0.8) - " + "object_store_memory_per_node. To increase the heap space available, " + "increase the memory in the spark cluster by changing instance types or " + "worker count, reduce the target `num_worker_nodes`, or apply a lower " + "`object_store_memory_per_node`." + ) + if insufficient_resources: + if safe_mode: + raise ValueError( + "You are creating ray cluster on spark with safe mode (it can be " + "disabled by setting argument 'safe_mode=False' when calling API " + "'init_ray_cluster'), safe mode requires the spark cluster config " + "satisfying following criterion: " + "\n".join(insufficient_resources) + ) + else: + _logger.warning("\n".join(insufficient_resources)) + + ray_head_ip = socket.gethostbyname(get_spark_application_driver_host(spark)) + + ray_head_port = get_random_unused_port(ray_head_ip, min_port=9000, max_port=10000) + ray_dashboard_port = get_random_unused_port( + ray_head_ip, min_port=9000, max_port=10000, exclude_list=[ray_head_port] + ) + ray_dashboard_agent_port = get_random_unused_port( + ray_head_ip, + min_port=9000, + max_port=10000, + exclude_list=[ray_head_port, ray_dashboard_port], + ) + + _logger.info(f"Ray head hostname {ray_head_ip}, port {ray_head_port}") + + cluster_unique_suffix = uuid.uuid4().hex[:8] + + if ray_temp_root_dir is None: + ray_temp_root_dir = start_hook.get_default_temp_dir() + ray_temp_dir = os.path.join( + ray_temp_root_dir, f"ray-{ray_head_port}-{cluster_unique_suffix}" + ) + os.makedirs(ray_temp_dir, exist_ok=True) + + ray_head_node_cmd = [ + sys.executable, + "-m", + "ray.util.spark.start_ray_node", + f"--temp-dir={ray_temp_dir}", + "--block", + "--head", + f"--node-ip-address={ray_head_ip}", + f"--port={ray_head_port}", + "--include-dashboard=true", + "--dashboard-host=0.0.0.0", + f"--dashboard-port={ray_dashboard_port}", + f"--dashboard-agent-listen-port={ray_dashboard_agent_port}", + # disallow ray tasks with cpu requirements from being scheduled on the head + # node. + "--num-cpus=0", + # limit the memory allocation to the head node (actual usage may increase + # beyond this for processing of tasks and actors). + f"--memory={128 * 1024 * 1024}", + # limit the object store memory allocation to the head node (actual usage + # may increase beyond this for processing of tasks and actors). + f"--object-store-memory={128 * 1024 * 1024}", + *_convert_ray_node_options(head_options), + ] + + _logger.info(f"Starting Ray head, command: {' '.join(ray_head_node_cmd)}") + + # `preexec_fn=setup_sigterm_on_parent_death` ensures the ray head node being + # killed if parent process died unexpectedly. + ray_head_proc, tail_output_deque = exec_cmd( + ray_head_node_cmd, + synchronous=False, + preexec_fn=setup_sigterm_on_parent_death, + extra_env={RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or ""}, + ) + + # wait ray head node spin up. + time.sleep(_RAY_HEAD_STARTUP_TIMEOUT) + + if not check_port_open(ray_head_ip, ray_head_port): + if ray_head_proc.poll() is None: + # Ray head GCS service is down. Kill ray head node. + ray_head_proc.terminate() + # wait killing complete. + time.sleep(0.5) + + cmd_exec_failure_msg = gen_cmd_exec_failure_msg( + ray_head_node_cmd, ray_head_proc.returncode, tail_output_deque + ) + raise RuntimeError("Start Ray head node failed!\n" + cmd_exec_failure_msg) + + _logger.info("Ray head node started.") + + start_hook.on_ray_dashboard_created(ray_dashboard_port) + + # NB: + # In order to start ray worker nodes on spark cluster worker machines, + # We launch a background spark job: + # 1. Each spark task launches one ray worker node. This design ensures all ray + # worker nodes have the same shape (same cpus / gpus / memory configuration). + # If ray worker nodes have a non-uniform shape, the Ray cluster setup will + # be non-deterministic and could create issues with node sizing. + # 2. A ray worker node is started via the `ray start` CLI. In each spark task, + # a child process is started and will execute a `ray start ...` command in + # blocking mode. + # 3. Each task will acquire a file lock for 10s to ensure that the ray worker + # init will acquire a port connection to the ray head node that does not + # contend with other worker processes on the same Spark worker node. + # 4. When the ray cluster is shutdown, killing ray worker nodes is implemented by: + # Installing a PR_SET_PDEATHSIG signal for the `ray start ...` child processes + # so that when parent process (pyspark task) is killed, the child processes + # (`ray start ...` processes) will receive a SIGTERM signal, killing it. + # Shutting down the ray cluster is performed by calling + # `sparkContext.cancelJobGroup` to cancel the background spark job, sending a + # SIGKILL signal to all spark tasks. Once the spark tasks are killed, this + # triggers the sending of a SIGTERM to the child processes spawned by the + # `ray_start ...` process. + + def ray_cluster_job_mapper(_): + from pyspark.taskcontext import TaskContext + + _worker_logger = logging.getLogger("ray.util.spark.worker") + + context = TaskContext.get() + + ( + worker_port_range_begin, + worker_port_range_end, + ) = _prepare_for_ray_worker_node_startup() + + # Ray worker might run on a machine different with the head node, so create the + # local log dir and temp dir again. + os.makedirs(ray_temp_dir, exist_ok=True) + + ray_worker_node_dashboard_agent_port = get_random_unused_port( + ray_head_ip, min_port=10000, max_port=20000 + ) + ray_worker_node_cmd = [ + sys.executable, + "-m", + "ray.util.spark.start_ray_node", + f"--temp-dir={ray_temp_dir}", + f"--num-cpus={num_spark_task_cpus}", + "--block", + f"--address={ray_head_ip}:{ray_head_port}", + f"--memory={ray_worker_node_heap_mem_bytes}", + f"--object-store-memory={ray_worker_node_object_store_mem_bytes}", + f"--min-worker-port={worker_port_range_begin}", + f"--max-worker-port={worker_port_range_end - 1}", + f"--dashboard-agent-listen-port={ray_worker_node_dashboard_agent_port}", + *_convert_ray_node_options(worker_options), + ] + + ray_worker_node_extra_envs = { + RAY_ON_SPARK_COLLECT_LOG_TO_PATH: collect_log_to_path or "" + } + + if num_spark_task_gpus > 0: + task_resources = context.resources() + + if "gpu" not in task_resources: + raise RuntimeError( + "Couldn't get the gpu id, Please check the GPU resource " + "configuration" + ) + gpu_addr_list = [ + int(addr.strip()) for addr in task_resources["gpu"].addresses + ] + + available_physical_gpus = get_spark_task_assigned_physical_gpus( + gpu_addr_list + ) + ray_worker_node_cmd.append( + f"--num-gpus={len(available_physical_gpus)}", + ) + ray_worker_node_extra_envs["CUDA_VISIBLE_DEVICES"] = ",".join( + [str(gpu_id) for gpu_id in available_physical_gpus] + ) + + _worker_logger.info( + f"Start Ray worker, command: {' '.join(ray_worker_node_cmd)}" + ) + + # `preexec_fn=setup_sigterm_on_parent_death` handles the case: + # If a user cancels the PySpark job, the worker process gets killed, regardless + # of PySpark daemon and worker reuse settings. + # We use prctl to ensure the command process receives SIGTERM after spark job + # cancellation. + # Note: + # When a pyspark job cancelled, the UDF python process are killed by signal + # "SIGKILL", This case neither "atexit" nor signal handler can capture SIGKILL + # signal. prctl is the only way to capture SIGKILL signal. + exec_cmd( + ray_worker_node_cmd, + synchronous=True, + extra_env=ray_worker_node_extra_envs, + preexec_fn=setup_sigterm_on_parent_death, + ) + + # NB: Not reachable. + yield 0 + + spark_job_group_id = f"ray-cluster-{ray_head_port}-{cluster_unique_suffix}" + + ray_cluster_handler = RayClusterOnSpark( + address=f"{ray_head_ip}:{ray_head_port}", + head_proc=ray_head_proc, + spark_job_group_id=spark_job_group_id, + num_workers_node=num_worker_nodes, + temp_dir=ray_temp_dir, + ) + + def background_job_thread_fn(): + + try: + spark.sparkContext.setJobGroup( + spark_job_group_id, + "This job group is for spark job which runs the Ray cluster with ray " + f"head node {ray_head_ip}:{ray_head_port}", + ) + + # Starting a normal spark job (not barrier spark job) to run ray worker + # nodes, the design purpose is: + # 1. Using normal spark job, spark tasks can automatically retry + # individually, we don't need to write additional retry logic, But, in + # barrier mode, if one spark task fails, it will cause all other spark + # tasks killed. + # 2. Using normal spark job, we can support failover when a spark worker + # physical machine crashes. (spark will try to re-schedule the spark task + # to other spark worker nodes) + # 3. Using barrier mode job, if the cluster resources does not satisfy + # "idle spark task slots >= argument num_spark_task", then the barrier + # job gets stuck and waits until enough idle task slots available, this + # behavior is not user-friendly, on a shared spark cluster, user is hard + # to estimate how many idle tasks available at a time, But, if using normal + # spark job, it can launch job with less spark tasks (i.e. user will see a + # ray cluster setup with less worker number initially), and when more task + # slots become available, it continues to launch tasks on new available + # slots, and user can see the ray cluster worker number increases when more + # slots available. + spark.sparkContext.parallelize( + list(range(num_worker_nodes)), num_worker_nodes + ).mapPartitions(ray_cluster_job_mapper).collect() + except Exception as e: + # NB: + # The background spark job is designed to running forever until it is + # killed, The exception might be raised in following cases: + # 1. The background job raises unexpected exception (i.e. ray cluster dies + # unexpectedly) + # 2. User explicitly orders shutting down the ray cluster. + # 3. On Databricks runtime, when a notebook is detached, it triggers + # python REPL `onCancel` event, cancelling the background running spark + # job. + # For case 1 and 3, only ray workers are killed, but driver side ray head + # might still be running and the ray context might be in connected status. + # In order to disconnect and kill the ray head node, a call to + # `ray_cluster_handler.shutdown()` is performed. + if not ray_cluster_handler.spark_job_is_canceled: + ray_cluster_handler.shutdown(cancel_background_job=False) + ray_cluster_handler.background_job_exception = e + + try: + threading.Thread( + target=inheritable_thread_target(background_job_thread_fn), args=() + ).start() + + # wait background spark task starting. + for _ in range(_BACKGROUND_JOB_STARTUP_WAIT): + time.sleep(1) + if ray_cluster_handler.background_job_exception is not None: + raise RuntimeError( + "Ray workers failed to start." + ) from ray_cluster_handler.background_job_exception + + start_hook.on_spark_background_job_created(spark_job_group_id) + return ray_cluster_handler + except Exception: + # If driver side setup ray-cluster routine raises exception, it might result + # in part of ray processes has been launched (e.g. ray head or some ray workers + # have been launched), calling `ray_cluster_handler.shutdown()` to kill them + # and clean status. + ray_cluster_handler.shutdown() + raise + + +_active_ray_cluster = None + + +@PublicAPI(stability="alpha") +def init_ray_cluster( + num_worker_nodes: int, + object_store_memory_per_node: Optional[int] = None, + head_options: Optional[Dict] = None, + worker_options: Optional[Dict] = None, + ray_temp_root_dir: Optional[str] = None, + safe_mode: Optional[bool] = False, + collect_log_to_path: Optional[str] = None, +) -> str: + """ + Initialize a ray cluster on the spark cluster by starting a ray head node in the + spark application's driver side node. + After creating the head node, a background spark job is created that + generates an instance of `RayClusterOnSpark` that contains configuration for the + ray cluster that will run on the Spark cluster's worker nodes. + After a ray cluster initialized, your python process automatically connect to the + ray cluster, you can call `ray.util.spark.shutdown_ray_cluster` to shut down the + ray cluster. + Note: If the active ray cluster haven't shut down, you cannot create a new ray + cluster. + + Args + num_worker_nodes: The number of spark worker nodes that the spark job will be + submitted to. This argument represents how many concurrent spark tasks will + be available in the creation of the ray cluster. The ray cluster's total + available resources (memory, CPU and/or GPU) is equal to the quantity of + resources allocated within these spark tasks. + Specifying the `num_worker_nodes` as `-1` represents a ray cluster + configuration that will use all available spark tasks slots (and resources + allocated to the spark application) on the spark cluster. + To create a spark cluster that is intended to be used exclusively as a + shared ray cluster, it is recommended to set this argument to + `ray.spark.utils.MAX_NUM_WORKER_NODES`. + object_store_memory_per_node: Object store memory available to per-ray worker + node, but it is capped by + "dev_shm_available_size * 0.8 / num_tasks_per_spark_worker". + The default value equals to + "dev_shm_available_size * 0.8 / num_tasks_per_spark_worker". + head_options: A dict representing Ray head node options. + worker_options: A dict representing Ray worker node options. + ray_temp_root_dir: A local disk path to store the ray temporary data. The + created cluster will create a subdirectory + "ray-{head_port}-{random_suffix}" beneath this path. + safe_mode: Boolean flag to fast-fail initialization of the ray cluster if + the available spark cluster does not have sufficient resources to fulfill + the resource allocation for memory, cpu and gpu. When set to true, if the + requested resources are not available for minimum recommended + functionality, an exception will be raised that details the inadequate + spark cluster configuration settings. If overridden as `False`, + a warning is raised. + collect_log_to_path: If specified, after ray head / worker nodes terminated, + collect their logs to the specified path. On Databricks Runtime, we + recommend you to specify a local path starts with '/dbfs/', because the + path mounts with a centralized storage device and stored data is persisted + after databricks spark cluster terminated. + + Returns: + The address of the initiated Ray cluster on spark. + """ + global _active_ray_cluster + + _check_system_environment() + + if _active_ray_cluster is not None: + raise RuntimeError( + "Current active ray cluster on spark haven't shut down. Please call " + "`ray.util.spark.shutdown_ray_cluster()` before initiating a new Ray " + "cluster on spark." + ) + + if ray.is_initialized(): + raise RuntimeError( + "Current python process already initialized Ray, Please shut down it " + "by `ray.shutdown()` before initiating a Ray cluster on spark." + ) + + cluster = _init_ray_cluster( + num_worker_nodes=num_worker_nodes, + object_store_memory_per_node=object_store_memory_per_node, + head_options=head_options, + worker_options=worker_options, + ray_temp_root_dir=ray_temp_root_dir, + safe_mode=safe_mode, + collect_log_to_path=collect_log_to_path, + ) + cluster.connect() # NB: this line might raise error. + + # If connect cluster successfully, set global _active_ray_cluster to be the started + # cluster. + _active_ray_cluster = cluster + return cluster.address + + +@PublicAPI(stability="alpha") +def shutdown_ray_cluster() -> None: + """ + Shut down the active ray cluster. + """ + global _active_ray_cluster + if _active_ray_cluster is None: + raise RuntimeError("No active ray cluster to shut down.") + + _active_ray_cluster.shutdown() + _active_ray_cluster = None diff --git a/python/ray/util/spark/databricks_hook.py b/python/ray/util/spark/databricks_hook.py new file mode 100644 index 000000000000..b55c5b1b9db6 --- /dev/null +++ b/python/ray/util/spark/databricks_hook.py @@ -0,0 +1,74 @@ +from .start_hook_base import RayOnSparkStartHook +from .utils import get_spark_session +import logging + +_logger = logging.getLogger(__name__) + + +class _NoDbutilsError(Exception): + pass + + +def get_dbutils(): + """ + Get databricks runtime dbutils module. + """ + try: + import IPython + + ip_shell = IPython.get_ipython() + if ip_shell is None: + raise _NoDbutilsError + return ip_shell.ns_table["user_global"]["dbutils"] + except ImportError: + raise _NoDbutilsError + except KeyError: + raise _NoDbutilsError + + +def display_databricks_driver_proxy_url(spark_context, port, title): + """ + This helper function create a proxy URL for databricks driver webapp forwarding. + In databricks runtime, user does not have permission to directly access web + service binding on driver machine port, but user can visit it by a proxy URL with + following format: "/driver-proxy/o/{orgId}/{clusterId}/{port}/". + """ + from dbruntime.display import displayHTML + + driverLocal = spark_context._jvm.com.databricks.backend.daemon.driver.DriverLocal + commandContextTags = driverLocal.commandContext().get().toStringMap().apply("tags") + orgId = commandContextTags.apply("orgId") + clusterId = commandContextTags.apply("clusterId") + + template = "/driver-proxy/o/{orgId}/{clusterId}/{port}/" + proxy_url = template.format(orgId=orgId, clusterId=clusterId, port=port) + + displayHTML( + f""" +
+ + Open {title} in a new tab + +
+ """ + ) + + +class DefaultDatabricksRayOnSparkStartHook(RayOnSparkStartHook): + def get_default_temp_dir(self): + return "/local_disk0/tmp" + + def on_ray_dashboard_created(self, port): + display_databricks_driver_proxy_url( + get_spark_session().sparkContext, port, "Ray Cluster Dashboard" + ) + + def on_spark_background_job_created(self, job_group_id): + try: + get_dbutils().entry_point.registerBackgroundSparkJobGroup(job_group_id) + except Exception: + _logger.warning( + "Register ray cluster spark job as background job failed. You need to " + "manually call `ray_cluster_on_spark.shutdown()` before detaching " + "your databricks python REPL." + ) diff --git a/python/ray/util/spark/start_hook_base.py b/python/ray/util/spark/start_hook_base.py new file mode 100644 index 000000000000..6421c0fc3c58 --- /dev/null +++ b/python/ray/util/spark/start_hook_base.py @@ -0,0 +1,9 @@ +class RayOnSparkStartHook: + def get_default_temp_dir(self): + return "/tmp" + + def on_ray_dashboard_created(self, port): + pass + + def on_spark_background_job_created(self, job_group): + pass diff --git a/python/ray/util/spark/start_ray_node.py b/python/ray/util/spark/start_ray_node.py new file mode 100644 index 000000000000..cedd2c786550 --- /dev/null +++ b/python/ray/util/spark/start_ray_node.py @@ -0,0 +1,125 @@ +import os.path +import subprocess +import sys +import time +import shutil +import fcntl +import signal +import socket +import logging +from ray.util.spark.cluster_init import RAY_ON_SPARK_COLLECT_LOG_TO_PATH +from ray._private.ray_process_reaper import SIGTERM_GRACE_PERIOD_SECONDS + + +# Spark on ray implementation does not directly invoke `ray start ...` script to create +# ray node subprocess, instead, it creates a subprocess to run this +# `ray.util.spark.start_ray_node` module, and in this module it invokes `ray start ...` +# script to start ray node, the purpose of `start_ray_node` module is to set up a +# SIGTERM handler for cleaning ray temp directory when ray node exits. +# When spark driver python process dies, or spark python worker dies, because they +# registered the PR_SET_PDEATHSIG signal, so OS will send a SIGTERM signal to its +# children processes, so `start_ray_node` subprocess will receive a SIGTERM signal and +# the SIGTERM handler will do cleanup work. + + +_logger = logging.getLogger(__name__) + + +if __name__ == "__main__": + arg_list = sys.argv[1:] + + collect_log_to_path = os.environ[RAY_ON_SPARK_COLLECT_LOG_TO_PATH] + + temp_dir_arg_prefix = "--temp-dir=" + temp_dir = None + + for arg in arg_list: + if arg.startswith(temp_dir_arg_prefix): + temp_dir = arg[len(temp_dir_arg_prefix) :] + + if temp_dir is None: + raise ValueError("Please explicitly set --temp-dir option.") + + temp_dir = os.path.normpath(temp_dir) + + ray_exec_path = os.path.join(os.path.dirname(sys.executable), "ray") + + lock_file = temp_dir + ".lock" + lock_fd = os.open(lock_file, os.O_RDWR | os.O_CREAT | os.O_TRUNC) + + # Mutilple ray nodes might start on the same machine, and they are using the + # same temp directory, adding a shared lock representing current ray node is + # using the temp directory. + fcntl.flock(lock_fd, fcntl.LOCK_SH) + process = subprocess.Popen([ray_exec_path, "start", *arg_list], text=True) + + def try_clean_temp_dir_at_exit(): + try: + # Wait for a while to ensure the children processes of the ray node all + # exited. + time.sleep(SIGTERM_GRACE_PERIOD_SECONDS + 0.5) + if process.poll() is None: + # "ray start ..." command process is still alive. Force to kill it. + process.kill() + + # Release the shared lock, representing current ray node does not use the + # temp dir. + fcntl.flock(lock_fd, fcntl.LOCK_UN) + + try: + # acquiring exclusive lock to ensure copy logs and removing dir safely. + fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + lock_acquired = True + except BlockingIOError: + # The file has active shared lock or exclusive lock, representing there + # are other ray nodes running, or other node running cleanup temp-dir + # routine. skip cleaning temp-dir, and skip copy logs to destination + # directory as well. + lock_acquired = False + + if lock_acquired: + # This is the final terminated ray node on current spark worker, + # start copy logs (including all local ray nodes logs) to destination. + if collect_log_to_path: + try: + copy_log_dest_path = os.path.join( + collect_log_to_path, + os.path.basename(temp_dir) + "-logs", + socket.gethostname(), + ) + ray_session_dir = os.readlink( + os.path.join(temp_dir, "session_latest") + ) + shutil.copytree( + os.path.join(ray_session_dir, "logs"), + copy_log_dest_path, + ) + except Exception as e: + _logger.warning( + "Collect logs to destination directory failed, " + f"error: {repr(e)}." + ) + + # Start cleaning the temp-dir, + shutil.rmtree(temp_dir, ignore_errors=True) + except Exception: + # swallow any exception. + pass + finally: + fcntl.flock(lock_fd, fcntl.LOCK_UN) + os.close(lock_fd) + + try: + + def sigterm_handler(*args): + process.terminate() + try_clean_temp_dir_at_exit() + os._exit(143) + + signal.signal(signal.SIGTERM, sigterm_handler) + ret_code = process.wait() + try_clean_temp_dir_at_exit() + sys.exit(ret_code) + except Exception: + try_clean_temp_dir_at_exit() + raise diff --git a/python/ray/util/spark/utils.py b/python/ray/util/spark/utils.py new file mode 100644 index 000000000000..56413abfa50b --- /dev/null +++ b/python/ray/util/spark/utils.py @@ -0,0 +1,263 @@ +import subprocess +import os +import sys +import random +import threading +import collections +import logging + + +_MEMORY_BUFFER_OFFSET = 0.8 + + +_logger = logging.getLogger("ray.util.spark.utils") + + +def is_in_databricks_runtime(): + return "DATABRICKS_RUNTIME_VERSION" in os.environ + + +def gen_cmd_exec_failure_msg(cmd, return_code, tail_output_deque): + cmd_str = " ".join(cmd) + tail_output = "".join(tail_output_deque) + return ( + f"Command {cmd_str} failed with return code {return_code}, tail output are " + f"included below.\n{tail_output}\n" + ) + + +def exec_cmd( + cmd, + *, + extra_env=None, + synchronous=True, + **kwargs, +): + """ + A convenience wrapper of `subprocess.Popen` for running a command from a Python + script. + If `synchronous` is True, wait until the process terminated and if subprocess + return code is not 0, raise error containing last 100 lines output. + If `synchronous` is False, return an `Popen` instance and a deque instance holding + tail outputs. + The subprocess stdout / stderr output will be streamly redirected to current + process stdout. + """ + illegal_kwargs = set(kwargs.keys()).intersection({"text", "stdout", "stderr"}) + if illegal_kwargs: + raise ValueError(f"`kwargs` cannot contain {list(illegal_kwargs)}") + + env = kwargs.pop("env", None) + if extra_env is not None and env is not None: + raise ValueError("`extra_env` and `env` cannot be used at the same time") + + env = env if extra_env is None else {**os.environ, **extra_env} + + process = subprocess.Popen( + cmd, + env=env, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + **kwargs, + ) + + tail_output_deque = collections.deque(maxlen=100) + + def redirect_log_thread_fn(): + for line in process.stdout: + # collect tail logs by `tail_output_deque` + tail_output_deque.append(line) + + # redirect to stdout. + sys.stdout.write(line) + + threading.Thread(target=redirect_log_thread_fn, args=()).start() + + if not synchronous: + return process, tail_output_deque + + return_code = process.wait() + if return_code != 0: + raise RuntimeError( + gen_cmd_exec_failure_msg(cmd, return_code, tail_output_deque) + ) + + +def check_port_open(host, port): + import socket + from contextlib import closing + + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + return sock.connect_ex((host, port)) == 0 + + +def get_random_unused_port( + host, min_port=1024, max_port=65535, max_retries=100, exclude_list=None +): + """ + Get random unused port. + """ + # Use true random generator + rng = random.SystemRandom() + + exclude_list = exclude_list or [] + for _ in range(max_retries): + port = rng.randint(min_port, max_port) + if port in exclude_list: + continue + if not check_port_open(host, port): + return port + raise RuntimeError( + f"Get available port between range {min_port} and {max_port} failed." + ) + + +def get_spark_session(): + from pyspark.sql import SparkSession + + spark_session = SparkSession.getActiveSession() + if spark_session is None: + raise RuntimeError( + "Spark session haven't been initiated yet. Please use " + "`SparkSession.builder` to create a spark session and connect to a spark " + "cluster." + ) + return spark_session + + +def get_spark_application_driver_host(spark): + return spark.conf.get("spark.driver.host") + + +def get_max_num_concurrent_tasks(spark_context): + """Gets the current max number of concurrent tasks.""" + # pylint: disable=protected-access + # spark version 3.1 and above have a different API for fetching max concurrent + # tasks + if spark_context._jsc.sc().version() >= "3.1": + return spark_context._jsc.sc().maxNumConcurrentTasks( + spark_context._jsc.sc().resourceProfileManager().resourceProfileFromId(0) + ) + return spark_context._jsc.sc().maxNumConcurrentTasks() + + +def _get_total_physical_memory(): + import psutil + + return psutil.virtual_memory().total + + +def _get_total_shared_memory(): + import shutil + + return shutil.disk_usage("/dev/shm").total + + +def _get_cpu_cores(): + import multiprocessing + + return multiprocessing.cpu_count() + + +def _calc_mem_per_ray_worker_node( + num_task_slots, physical_mem_bytes, shared_mem_bytes, object_store_memory_per_node +): + available_physical_mem_per_node = int( + physical_mem_bytes / num_task_slots * _MEMORY_BUFFER_OFFSET + ) + available_shared_mem_per_node = int( + shared_mem_bytes / num_task_slots * _MEMORY_BUFFER_OFFSET + ) + if object_store_memory_per_node is None: + object_store_bytes = available_shared_mem_per_node + else: + object_store_bytes = int( + min( + object_store_memory_per_node, + available_shared_mem_per_node, + ) + ) + heap_mem_bytes = available_physical_mem_per_node - object_store_bytes + return heap_mem_bytes, object_store_bytes + + +def get_avail_mem_per_ray_worker_node(spark, object_store_memory_per_node): + """ + Return the available heap memory and object store memory for each ray worker. + NB: We have one ray node per spark task. + """ + num_cpus_per_spark_task = int( + spark.sparkContext.getConf().get("spark.task.cpus", "1") + ) + + def mapper(_): + try: + num_cpus = _get_cpu_cores() + num_task_slots = num_cpus // num_cpus_per_spark_task + + physical_mem_bytes = _get_total_physical_memory() + shared_mem_bytes = _get_total_shared_memory() + + ( + ray_worker_node_heap_mem_bytes, + ray_worker_node_object_store_bytes, + ) = _calc_mem_per_ray_worker_node( + num_task_slots, + physical_mem_bytes, + shared_mem_bytes, + object_store_memory_per_node, + ) + return ( + ray_worker_node_heap_mem_bytes, + ray_worker_node_object_store_bytes, + None, + ) + except Exception as e: + return -1, -1, repr(e) + + # Running memory inference routine on spark executor side since the spark worker + # nodes may have a different machine configuration compared to the spark driver + # node. + ( + inferred_ray_worker_node_heap_mem_bytes, + inferred_ray_worker_node_object_store_bytes, + err, + ) = ( + spark.sparkContext.parallelize([1], 1).map(mapper).collect()[0] + ) + + if err is not None: + raise RuntimeError( + f"Inferring ray worker available memory failed, error: {err}" + ) + return ( + inferred_ray_worker_node_heap_mem_bytes, + inferred_ray_worker_node_object_store_bytes, + ) + + +def get_spark_task_assigned_physical_gpus(gpu_addr_list): + if "CUDA_VISIBLE_DEVICES" in os.environ: + visible_cuda_dev_list = [ + int(dev.strip()) for dev in os.environ["CUDA_VISIBLE_DEVICES"].split(",") + ] + return [visible_cuda_dev_list[addr] for addr in gpu_addr_list] + else: + return gpu_addr_list + + +def setup_sigterm_on_parent_death(): + """ + Uses prctl to automatically send SIGTERM to the child process when its parent is + dead. The child process itself should handle SIGTERM properly. + """ + try: + import ctypes + import signal + + libc = ctypes.CDLL("libc.so.6") + # Set the parent process death signal of the command process to SIGTERM. + libc.prctl(1, signal.SIGTERM) # PR_SET_PDEATHSIG, see prctl.h + except OSError as e: + _logger.warning(f"Setup libc.prctl PR_SET_PDEATHSIG failed, error {repr(e)}.") diff --git a/python/requirements_test.txt b/python/requirements_test.txt index 14da27675e77..00cee27d23d4 100644 --- a/python/requirements_test.txt +++ b/python/requirements_test.txt @@ -54,7 +54,7 @@ PyOpenSSL==22.1.0 pygame==2.1.2 Pygments==2.13.0 pymongo==4.3.2 -pyspark==3.1.2 +pyspark==3.3.1 pytest==7.0.1 pytest-asyncio==0.16.0 pytest-rerunfailures==10.2