Skip to content

Commit

Permalink
Ray on spark implementation (#28771)
Browse files Browse the repository at this point in the history
  • Loading branch information
WeichenXu123 authored and AmeerHajAli committed Jan 12, 2023
1 parent 28c9d51 commit 9f8bd14
Show file tree
Hide file tree
Showing 17 changed files with 1,757 additions and 1 deletion.
14 changes: 14 additions & 0 deletions .buildkite/pipeline.build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,20 @@
--test_env=CONDA_DEFAULT_ENV
python/ray/tests/...

- label: ":python: Ray on Spark Test"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- pip uninstall -y ray
- RAY_DEBUG_BUILD=debug ./ci/ci.sh build
- ./ci/env/env_info.sh
- bazel test --config=ci-debug $(./ci/run/bazel_export_options)
--test_env=RAY_ON_SPARK_BACKGROUND_JOB_STARTUP_WAIT=1
--test_env=RAY_ON_SPARK_RAY_WORKER_NODE_STARTUP_INTERVAL=5
--test_tag_filters=-kubernetes,spark_plugin_tests
python/ray/tests/...

# https://github.com/ray-project/ray/issues/22460
#- label: ":python: (Privileged test)"
#conditions: ["RAY_CI_PYTHON_AFFECTED"]
Expand Down
1 change: 1 addition & 0 deletions doc/source/cluster/vms/user-guides/community/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The following is a list of community supported cluster managers.
yarn.rst
slurm.rst
lsf.rst
spark.rst

.. _ref-additional-cloud-providers:

Expand Down
92 changes: 92 additions & 0 deletions doc/source/cluster/vms/user-guides/community/spark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
.. _ray-Spark-deploy:

Deploying on Spark Standalone cluster
=====================================

This document describes a couple high-level steps to run Ray clusters on `Spark Standalone cluster <https://spark.apache.org/docs/latest/spark-standalone.html>`_.

Running a basic example
-----------------------

This is a spark application example code that starts Ray cluster on spark,
and then execute ray application code, then shut down initiated ray cluster.

1) Create a python file that contains a spark application code,
Assuming the python file name is 'ray-on-spark-example1.py'.

.. code-block:: python
from pyspark.sql import SparkSession
from ray.util.spark import init_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Ray on spark example 1") \
.config("spark.task.cpus", "4") \
.getOrCreate()
# initiate a ray cluster on this spark application, it creates a background
# spark job that each spark task launches one ray worker node.
# ray head node is launched in spark application driver side.
# Resources (CPU / GPU / memory) allocated to each ray worker node is equal
# to resources allocated to the corresponding spark task.
init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)
# You can any ray application code here, the ray application will be executed
# on the ray cluster setup above.
# Note that you don't need to call `ray.init`.
...
# Terminate ray cluster explicitly.
# If you don't call it, when spark application is terminated, the ray cluster will
# also be terminated.
shutdown_ray_cluster()
2) Submit the spark application above to spark standalone cluster.

.. code-block:: bash
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/ray-on-spark-example1.py
Creating a long running ray cluster on spark cluster
----------------------------------------------------

This is a spark application example code that starts a long running Ray cluster on spark.
The created ray cluster can be accessed by remote python processes.

1) Create a python file that contains a spark application code,
Assuming the python file name is 'long-running-ray-cluster-on-spark.py'.

.. code-block:: python
from pyspark.sql import SparkSession
import time
from ray.util.spark import init_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("long running ray cluster on spark") \
.config("spark.task.cpus", "4") \
.getOrCreate()
cluster_address = init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)
print("Ray cluster is initiated, you can connect to this ray cluster via address "
f"ray://{cluster_address}")
# Sleep forever until the spark application being terminated,
# at that time, the ray cluster will also be terminated.
while True:
time.sleep(10)
2) Submit the spark application above to spark standalone cluster.

.. code-block:: bash
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/long-running-ray-cluster-on-spark.py
16 changes: 16 additions & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,19 @@ py_test_module_list(
tags = ["exclusive", "asan_tests", "team:core"],
deps = ["//:ray_lib", ":conftest"],
)

py_test_module_list(
files = [
"spark/test_basic.py",
"spark/test_GPU.py",
"spark/test_multicores_per_task.py",
"spark/test_utils.py",
],
size = "large",
tags = ["exclusive", "spark_plugin_tests", "team:serverless"],
deps = ["//:ray_lib", ":conftest"],
data = [
"spark/discover_2_gpu.sh",
"spark/discover_4_gpu.sh"
],
)
4 changes: 4 additions & 0 deletions python/ray/tests/spark/discover_2_gpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

# This script is used in spark GPU cluster config for discovering available GPU.
echo "{\"name\":\"gpu\",\"addresses\":[\"0\",\"1\"]}"
4 changes: 4 additions & 0 deletions python/ray/tests/spark/discover_4_gpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

# This script is used in spark GPU cluster config for discovering available GPU.
echo "{\"name\":\"gpu\",\"addresses\":[\"0\",\"1\",\"2\",\"3\"]}"
87 changes: 87 additions & 0 deletions python/ray/tests/spark/test_GPU.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import sys
import pytest
import os
import time
import functools
from abc import ABC
from pyspark.sql import SparkSession
from ray.tests.spark.test_basic import RayOnSparkCPUClusterTestBase

import ray
from ray.util.spark.cluster_init import _init_ray_cluster

pytestmark = pytest.mark.skipif(
not sys.platform.startswith("linux"),
reason="Ray on spark only supports running on Linux.",
)


class RayOnSparkGPUClusterTestBase(RayOnSparkCPUClusterTestBase, ABC):

num_total_gpus = None
num_gpus_per_spark_task = None

def test_gpu_allocation(self):

for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]:
with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False):
worker_res_list = self.get_ray_worker_resources_list()
assert len(worker_res_list) == num_spark_tasks
for worker_res in worker_res_list:
assert worker_res["GPU"] == self.num_gpus_per_spark_task

def test_basic_ray_app_using_gpu(self):

with _init_ray_cluster(num_worker_nodes=self.max_spark_tasks, safe_mode=False):

@ray.remote(num_cpus=1, num_gpus=1)
def f(_):
# Add a sleep to avoid the task finishing too fast,
# so that it can make all ray tasks concurrently running in all idle
# task slots.
time.sleep(5)
return [
int(gpu_id)
for gpu_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")
]

futures = [f.remote(i) for i in range(self.num_total_gpus)]
results = ray.get(futures)
merged_results = functools.reduce(lambda x, y: x + y, results)
# Test all ray tasks are assigned with different GPUs.
assert sorted(merged_results) == list(range(self.num_total_gpus))


class TestBasicSparkGPUCluster(RayOnSparkGPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 2
cls.num_total_gpus = 2
cls.num_cpus_per_spark_task = 1
cls.num_gpus_per_spark_task = 1
cls.max_spark_tasks = 2
gpu_discovery_script_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "discover_2_gpu.sh"
)
os.environ["SPARK_WORKER_CORES"] = "4"
cls.spark = (
SparkSession.builder.master("local-cluster[1, 2, 1024]")
.config("spark.task.cpus", "1")
.config("spark.task.resource.gpu.amount", "1")
.config("spark.executor.cores", "2")
.config("spark.worker.resource.gpu.amount", "2")
.config("spark.executor.resource.gpu.amount", "2")
.config("spark.task.maxFailures", "1")
.config(
"spark.worker.resource.gpu.discoveryScript", gpu_discovery_script_path
)
.getOrCreate()
)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
157 changes: 157 additions & 0 deletions python/ray/tests/spark/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import os
import shutil
import tempfile
import socket
import pytest
import sys

from abc import ABC

import ray

import ray.util.spark.cluster_init
from ray.util.spark import init_ray_cluster, shutdown_ray_cluster
from ray.util.spark.cluster_init import _init_ray_cluster
from ray.util.spark.utils import check_port_open
from pyspark.sql import SparkSession
import time
import logging

pytestmark = pytest.mark.skipif(
not sys.platform.startswith("linux"),
reason="Ray on spark only supports running on Linux.",
)

_logger = logging.getLogger(__name__)


class RayOnSparkCPUClusterTestBase(ABC):

spark = None
num_total_cpus = None
num_cpus_per_spark_task = None
max_spark_tasks = None

@classmethod
def setup_class(cls):
pass

@classmethod
def teardown_class(cls):
time.sleep(10) # Wait all background spark job canceled.
cls.spark.stop()

@staticmethod
def get_ray_worker_resources_list():
wr_list = []
for node in ray.nodes():
# exclude dead node and head node (with 0 CPU resource)
if node["Alive"] and node["Resources"].get("CPU", 0) > 0:
wr_list.append(node["Resources"])
return wr_list

def test_cpu_allocation(self):
for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]:
with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False):
worker_res_list = self.get_ray_worker_resources_list()
assert len(worker_res_list) == num_spark_tasks
for worker_res in worker_res_list:
assert worker_res["CPU"] == self.num_cpus_per_spark_task

def test_public_api(self):
try:
ray_temp_root_dir = tempfile.mkdtemp()
collect_log_to_path = tempfile.mkdtemp()
init_ray_cluster(
num_worker_nodes=self.max_spark_tasks,
safe_mode=False,
collect_log_to_path=collect_log_to_path,
ray_temp_root_dir=ray_temp_root_dir,
)

@ray.remote
def f(x):
return x * x

futures = [f.remote(i) for i in range(32)]
results = ray.get(futures)
assert results == [i * i for i in range(32)]

shutdown_ray_cluster()

time.sleep(7)
# assert temp dir is removed.
assert len(os.listdir(ray_temp_root_dir)) == 1 and os.listdir(
ray_temp_root_dir
)[0].endswith(".lock")

# assert logs are copied to specified path
listed_items = os.listdir(collect_log_to_path)
assert len(listed_items) == 1 and listed_items[0].startswith("ray-")
log_dest_dir = os.path.join(
collect_log_to_path, listed_items[0], socket.gethostname()
)
assert os.path.exists(log_dest_dir) and len(os.listdir(log_dest_dir)) > 0
finally:
if ray.util.spark.cluster_init._active_ray_cluster is not None:
# if the test raised error and does not destroy cluster,
# destroy it here.
ray.util.spark._active_ray_cluster.shutdown()
time.sleep(5)
shutil.rmtree(ray_temp_root_dir, ignore_errors=True)
shutil.rmtree(collect_log_to_path, ignore_errors=True)

def test_ray_cluster_shutdown(self):
with _init_ray_cluster(
num_worker_nodes=self.max_spark_tasks, safe_mode=False
) as cluster:
assert len(self.get_ray_worker_resources_list()) == self.max_spark_tasks

# Test: cancel background spark job will cause all ray worker nodes exit.
cluster._cancel_background_spark_job()
time.sleep(8)

assert len(self.get_ray_worker_resources_list()) == 0

time.sleep(2) # wait ray head node exit.
# assert ray head node exit by checking head port being closed.
hostname, port = cluster.address.split(":")
assert not check_port_open(hostname, int(port))

def test_background_spark_job_exit_trigger_ray_head_exit(self):
with _init_ray_cluster(
num_worker_nodes=self.max_spark_tasks, safe_mode=False
) as cluster:
# Mimic the case the job failed unexpectedly.
cluster._cancel_background_spark_job()
cluster.spark_job_is_canceled = False
time.sleep(5)

# assert ray head node exit by checking head port being closed.
hostname, port = cluster.address.split(":")
assert not check_port_open(hostname, int(port))


class TestBasicSparkCluster(RayOnSparkCPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 2
cls.num_total_gpus = 0
cls.num_cpus_per_spark_task = 1
cls.num_gpus_per_spark_task = 0
cls.max_spark_tasks = 2
os.environ["SPARK_WORKER_CORES"] = "2"
cls.spark = (
SparkSession.builder.master("local-cluster[1, 2, 1024]")
.config("spark.task.cpus", "1")
.config("spark.task.maxFailures", "1")
.getOrCreate()
)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
Loading

0 comments on commit 9f8bd14

Please sign in to comment.