Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray on spark implementation #28771

Merged
merged 126 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
126 commits
Select commit Hold shift + click to select a range
c4871d8
init
WeichenXu123 Sep 26, 2022
93f527a
memory control
WeichenXu123 Sep 27, 2022
06d4125
check spark version
WeichenXu123 Sep 29, 2022
3a010d0
db set background job
WeichenXu123 Sep 29, 2022
8e70636
setup extra dep
WeichenXu123 Sep 29, 2022
fb6712e
support ray node options
WeichenXu123 Sep 29, 2022
e6e443b
doc
WeichenXu123 Sep 29, 2022
45c5b37
add object store mem setting
WeichenXu123 Oct 8, 2022
97359ec
disconnect when shutdown
WeichenXu123 Oct 8, 2022
f1d5a84
misc udpates
WeichenXu123 Oct 9, 2022
f4f73a4
update
WeichenXu123 Oct 9, 2022
29cd053
add test
WeichenXu123 Oct 9, 2022
dc63cc6
fix
WeichenXu123 Oct 9, 2022
6d00600
update test
WeichenXu123 Oct 9, 2022
3a5b01b
fix
WeichenXu123 Oct 9, 2022
c84733a
add tests
WeichenXu123 Oct 9, 2022
82b5735
fix
WeichenXu123 Oct 9, 2022
e3463ea
add param
WeichenXu123 Oct 10, 2022
ef8dbf2
update test
WeichenXu123 Oct 10, 2022
a5eb1e2
Merge branch 'master' into ray-on-spark
WeichenXu123 Oct 10, 2022
cebae2d
update setup-dev
WeichenXu123 Oct 10, 2022
d127e34
fix
WeichenXu123 Oct 10, 2022
02dbc15
add logger
WeichenXu123 Oct 10, 2022
5f5649c
add tests
WeichenXu123 Oct 10, 2022
df85ef1
update
WeichenXu123 Oct 10, 2022
0119fe5
add warning
WeichenXu123 Oct 10, 2022
cab7ff0
update
WeichenXu123 Oct 10, 2022
5869b82
update
WeichenXu123 Oct 10, 2022
98a7d62
update
WeichenXu123 Oct 11, 2022
e0d4621
update
WeichenXu123 Oct 11, 2022
779abc2
add connect / disconnect api
WeichenXu123 Oct 11, 2022
597059d
add comment
WeichenXu123 Oct 11, 2022
8fffcf2
add test
WeichenXu123 Oct 11, 2022
3ef3cf5
update comment
WeichenXu123 Oct 11, 2022
8eaecd6
use safe port
WeichenXu123 Oct 11, 2022
8d4ddd0
update
WeichenXu123 Oct 11, 2022
32a53a0
fix
WeichenXu123 Oct 11, 2022
a699c62
use random port
WeichenXu123 Oct 11, 2022
ce9635a
update
WeichenXu123 Oct 11, 2022
3f4634a
update
WeichenXu123 Oct 11, 2022
488a878
update
WeichenXu123 Oct 11, 2022
6b01329
update
WeichenXu123 Oct 11, 2022
7b66059
update
WeichenXu123 Oct 11, 2022
8054b85
debug
WeichenXu123 Oct 11, 2022
9c42646
add sleep
WeichenXu123 Oct 11, 2022
28f7cef
update
WeichenXu123 Oct 11, 2022
6ed296d
misc udpates
WeichenXu123 Oct 12, 2022
884d913
add test
WeichenXu123 Oct 12, 2022
39692b0
update
WeichenXu123 Oct 12, 2022
6849e10
update
WeichenXu123 Oct 12, 2022
00fef7c
update
WeichenXu123 Oct 12, 2022
39d6831
update
WeichenXu123 Oct 12, 2022
c7ff65f
sizing logic change and test add
BenWilson2 Oct 18, 2022
824c4b3
update ray head host
WeichenXu123 Oct 19, 2022
260a0a7
use active spark session
WeichenXu123 Oct 19, 2022
7305b57
Merge branch 'ray-on-spark' of https://github.com/WeichenXu123/ray in…
BenWilson2 Oct 19, 2022
809c347
fix safe_mode logic
BenWilson2 Oct 19, 2022
da78fcb
add initial retry logic for failed barrier execution
BenWilson2 Oct 21, 2022
7c59952
switch to standard task context instead of barrier context execution
BenWilson2 Oct 24, 2022
81a7a94
use TaskContext and adjust port timing to use partition_id
BenWilson2 Oct 24, 2022
a990970
add worker file lock for race condition port acquisition
BenWilson2 Oct 25, 2022
d13392f
PR feedback
BenWilson2 Oct 28, 2022
74fed31
remove auto-added init
BenWilson2 Oct 28, 2022
23a54dd
update
WeichenXu123 Nov 21, 2022
be9a411
add comments
WeichenXu123 Nov 21, 2022
28083a4
updates
WeichenXu123 Nov 21, 2022
c600449
update
WeichenXu123 Nov 21, 2022
a513ce8
Merge branch 'master' into ray-on-spark
WeichenXu123 Nov 22, 2022
fef37d5
update
WeichenXu123 Nov 22, 2022
f280f77
dashboard
WeichenXu123 Nov 23, 2022
957fb86
fix
WeichenXu123 Nov 23, 2022
268bacc
update
WeichenXu123 Nov 24, 2022
7e8ff81
simplify api
WeichenXu123 Nov 25, 2022
d6859fd
update
WeichenXu123 Nov 26, 2022
0b52e64
set object manager port
WeichenXu123 Nov 26, 2022
7b1814c
dont capture output
WeichenXu123 Nov 26, 2022
f55723d
update
WeichenXu123 Nov 26, 2022
2dc2a67
update
WeichenXu123 Nov 26, 2022
48ace3d
worker port range
WeichenXu123 Nov 26, 2022
181b2c7
set ray head ip
WeichenXu123 Nov 26, 2022
a69f241
refactor
WeichenXu123 Nov 27, 2022
9a120d4
dynamic dashboard port
WeichenXu123 Nov 27, 2022
6a74ae1
update temp dir
WeichenXu123 Nov 27, 2022
fb8299c
exception includes ray node tail output
WeichenXu123 Nov 29, 2022
a813f32
improve worker node error exception
WeichenXu123 Nov 29, 2022
7583cc8
improve worker node error exception
WeichenXu123 Nov 29, 2022
e95e4eb
address comments
WeichenXu123 Nov 29, 2022
47381dd
worker port range allocation
WeichenXu123 Dec 1, 2022
8989f06
set dashboard agent port
WeichenXu123 Dec 2, 2022
2c7c0cd
fix
WeichenXu123 Dec 2, 2022
f2ab57d
update worker dashboard_agent_port
WeichenXu123 Dec 2, 2022
c32aeb4
auto clean temp dir
WeichenXu123 Dec 5, 2022
66569f8
fix
WeichenXu123 Dec 5, 2022
afc5047
object_store_memory_per_node
WeichenXu123 Dec 5, 2022
1c4b34b
update start node code
WeichenXu123 Dec 6, 2022
f424461
fix
WeichenXu123 Dec 6, 2022
613a336
improve start_ray_node code
WeichenXu123 Dec 9, 2022
10b22c0
update init_ray_cluster
WeichenXu123 Dec 9, 2022
c19cad1
Merge branch 'master' into ray-on-spark
WeichenXu123 Dec 9, 2022
6c32f42
head node PR_SET_PDEATHSIG
WeichenXu123 Dec 12, 2022
7c67d5c
start hook
WeichenXu123 Dec 12, 2022
1a4e5b7
update test and build
WeichenXu123 Dec 13, 2022
96e33e7
update ci pipeline
WeichenXu123 Dec 13, 2022
c136416
remove total_xx argumens
WeichenXu123 Dec 13, 2022
fca0325
fix
WeichenXu123 Dec 13, 2022
f488402
fix
WeichenXu123 Dec 13, 2022
4f8773a
address comments
WeichenXu123 Dec 15, 2022
596ae90
speedup test
WeichenXu123 Dec 15, 2022
ad6fb6b
fix
WeichenXu123 Dec 15, 2022
b777d83
split tests
WeichenXu123 Dec 16, 2022
1d04fc0
set test env
WeichenXu123 Dec 16, 2022
eb87eb2
fix ci
WeichenXu123 Dec 16, 2022
a2863b6
add param collect_log_to_path
WeichenXu123 Dec 16, 2022
92b8d40
add tests
WeichenXu123 Dec 16, 2022
0a03355
Merge branch 'master' into ray-on-spark
WeichenXu123 Dec 17, 2022
ddc0fd9
improve start script
WeichenXu123 Dec 17, 2022
49ab76e
fix linter
WeichenXu123 Dec 17, 2022
d2febe2
update tests
WeichenXu123 Dec 17, 2022
ecc0139
update tests
WeichenXu123 Dec 17, 2022
f2d0919
fix copy logs
WeichenXu123 Dec 18, 2022
105721a
skip tests on windows/macos
WeichenXu123 Dec 18, 2022
9fbf3de
minor updates
WeichenXu123 Dec 18, 2022
adbe42d
add user guide
WeichenXu123 Dec 19, 2022
bce13d7
minor update
WeichenXu123 Dec 19, 2022
303cf18
minor updates
WeichenXu123 Dec 19, 2022
19e0d16
update user-guide
WeichenXu123 Dec 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .buildkite/pipeline.build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,20 @@
--test_env=CONDA_DEFAULT_ENV
python/ray/tests/...

- label: ":python: Ray on Spark Test"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create a separate test suite, or just be part of small/large python tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to create a separate label, because this is an extension plugin, and it might run a bit longer than other tests.

conditions: ["RAY_CI_PYTHON_AFFECTED"]
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- pip uninstall -y ray
- RAY_DEBUG_BUILD=debug ./ci/ci.sh build
- ./ci/env/env_info.sh
- bazel test --config=ci-debug $(./ci/run/bazel_export_options)
--test_env=RAY_ON_SPARK_BACKGROUND_JOB_STARTUP_WAIT=1
--test_env=RAY_ON_SPARK_RAY_WORKER_NODE_STARTUP_INTERVAL=5
--test_tag_filters=-kubernetes,spark_plugin_tests
python/ray/tests/...

# https://github.com/ray-project/ray/issues/22460
#- label: ":python: (Privileged test)"
#conditions: ["RAY_CI_PYTHON_AFFECTED"]
Expand Down
1 change: 1 addition & 0 deletions doc/source/cluster/vms/user-guides/community/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The following is a list of community supported cluster managers.
yarn.rst
slurm.rst
lsf.rst
spark.rst

.. _ref-additional-cloud-providers:

Expand Down
92 changes: 92 additions & 0 deletions doc/source/cluster/vms/user-guides/community/spark.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
.. _ray-Spark-deploy:

Deploying on Spark Standalone cluster
=====================================

This document describes a couple high-level steps to run Ray clusters on `Spark Standalone cluster <https://spark.apache.org/docs/latest/spark-standalone.html>`_.

Running a basic example
-----------------------

This is a spark application example code that starts Ray cluster on spark,
and then execute ray application code, then shut down initiated ray cluster.

1) Create a python file that contains a spark application code,
Assuming the python file name is 'ray-on-spark-example1.py'.

.. code-block:: python
from pyspark.sql import SparkSession
from ray.util.spark import init_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
WeichenXu123 marked this conversation as resolved.
Show resolved Hide resolved
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Ray on spark example 1") \
.config("spark.task.cpus", "4") \
.getOrCreate()
# initiate a ray cluster on this spark application, it creates a background
# spark job that each spark task launches one ray worker node.
# ray head node is launched in spark application driver side.
# Resources (CPU / GPU / memory) allocated to each ray worker node is equal
# to resources allocated to the corresponding spark task.
init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)
# You can any ray application code here, the ray application will be executed
# on the ray cluster setup above.
# Note that you don't need to call `ray.init`.
...
# Terminate ray cluster explicitly.
# If you don't call it, when spark application is terminated, the ray cluster will
# also be terminated.
shutdown_ray_cluster()
2) Submit the spark application above to spark standalone cluster.

.. code-block:: bash
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/ray-on-spark-example1.py
Creating a long running ray cluster on spark cluster
----------------------------------------------------

This is a spark application example code that starts a long running Ray cluster on spark.
The created ray cluster can be accessed by remote python processes.

1) Create a python file that contains a spark application code,
Assuming the python file name is 'long-running-ray-cluster-on-spark.py'.

.. code-block:: python
from pyspark.sql import SparkSession
import time
from ray.util.spark import init_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("long running ray cluster on spark") \
.config("spark.task.cpus", "4") \
.getOrCreate()
cluster_address = init_ray_cluster(num_worker_nodes=MAX_NUM_WORKER_NODES)
print("Ray cluster is initiated, you can connect to this ray cluster via address "
f"ray://{cluster_address}")
# Sleep forever until the spark application being terminated,
# at that time, the ray cluster will also be terminated.
while True:
time.sleep(10)
2) Submit the spark application above to spark standalone cluster.

.. code-block:: bash
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/long-running-ray-cluster-on-spark.py
16 changes: 16 additions & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -518,3 +518,19 @@ py_test_module_list(
tags = ["exclusive", "asan_tests", "team:core"],
deps = ["//:ray_lib", ":conftest"],
)

py_test_module_list(
files = [
"spark/test_basic.py",
"spark/test_GPU.py",
"spark/test_multicores_per_task.py",
"spark/test_utils.py",
],
size = "large",
tags = ["exclusive", "spark_plugin_tests", "team:serverless"],
deps = ["//:ray_lib", ":conftest"],
data = [
"spark/discover_2_gpu.sh",
"spark/discover_4_gpu.sh"
],
)
4 changes: 4 additions & 0 deletions python/ray/tests/spark/discover_2_gpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

# This script is used in spark GPU cluster config for discovering available GPU.
echo "{\"name\":\"gpu\",\"addresses\":[\"0\",\"1\"]}"
4 changes: 4 additions & 0 deletions python/ray/tests/spark/discover_4_gpu.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

# This script is used in spark GPU cluster config for discovering available GPU.
echo "{\"name\":\"gpu\",\"addresses\":[\"0\",\"1\",\"2\",\"3\"]}"
87 changes: 87 additions & 0 deletions python/ray/tests/spark/test_GPU.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import sys
import pytest
import os
import time
import functools
from abc import ABC
from pyspark.sql import SparkSession
from ray.tests.spark.test_basic import RayOnSparkCPUClusterTestBase

import ray
from ray.util.spark.cluster_init import _init_ray_cluster

pytestmark = pytest.mark.skipif(
not sys.platform.startswith("linux"),
reason="Ray on spark only supports running on Linux.",
)


class RayOnSparkGPUClusterTestBase(RayOnSparkCPUClusterTestBase, ABC):

num_total_gpus = None
num_gpus_per_spark_task = None

def test_gpu_allocation(self):

for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]:
with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False):
worker_res_list = self.get_ray_worker_resources_list()
assert len(worker_res_list) == num_spark_tasks
for worker_res in worker_res_list:
assert worker_res["GPU"] == self.num_gpus_per_spark_task

def test_basic_ray_app_using_gpu(self):

with _init_ray_cluster(num_worker_nodes=self.max_spark_tasks, safe_mode=False):

@ray.remote(num_cpus=1, num_gpus=1)
def f(_):
# Add a sleep to avoid the task finishing too fast,
# so that it can make all ray tasks concurrently running in all idle
# task slots.
time.sleep(5)
return [
int(gpu_id)
for gpu_id in os.environ["CUDA_VISIBLE_DEVICES"].split(",")
]

futures = [f.remote(i) for i in range(self.num_total_gpus)]
results = ray.get(futures)
merged_results = functools.reduce(lambda x, y: x + y, results)
# Test all ray tasks are assigned with different GPUs.
assert sorted(merged_results) == list(range(self.num_total_gpus))


class TestBasicSparkGPUCluster(RayOnSparkGPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 2
cls.num_total_gpus = 2
cls.num_cpus_per_spark_task = 1
cls.num_gpus_per_spark_task = 1
cls.max_spark_tasks = 2
gpu_discovery_script_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "discover_2_gpu.sh"
)
os.environ["SPARK_WORKER_CORES"] = "4"
cls.spark = (
SparkSession.builder.master("local-cluster[1, 2, 1024]")
.config("spark.task.cpus", "1")
.config("spark.task.resource.gpu.amount", "1")
.config("spark.executor.cores", "2")
.config("spark.worker.resource.gpu.amount", "2")
.config("spark.executor.resource.gpu.amount", "2")
.config("spark.task.maxFailures", "1")
.config(
"spark.worker.resource.gpu.discoveryScript", gpu_discovery_script_path
)
.getOrCreate()
)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
157 changes: 157 additions & 0 deletions python/ray/tests/spark/test_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import os
import shutil
import tempfile
import socket
import pytest
import sys

from abc import ABC

import ray

import ray.util.spark.cluster_init
from ray.util.spark import init_ray_cluster, shutdown_ray_cluster
from ray.util.spark.cluster_init import _init_ray_cluster
from ray.util.spark.utils import check_port_open
from pyspark.sql import SparkSession
import time
import logging

pytestmark = pytest.mark.skipif(
not sys.platform.startswith("linux"),
reason="Ray on spark only supports running on Linux.",
)

_logger = logging.getLogger(__name__)


class RayOnSparkCPUClusterTestBase(ABC):

spark = None
num_total_cpus = None
num_cpus_per_spark_task = None
max_spark_tasks = None

@classmethod
def setup_class(cls):
pass

@classmethod
def teardown_class(cls):
time.sleep(10) # Wait all background spark job canceled.
cls.spark.stop()

@staticmethod
def get_ray_worker_resources_list():
wr_list = []
for node in ray.nodes():
# exclude dead node and head node (with 0 CPU resource)
if node["Alive"] and node["Resources"].get("CPU", 0) > 0:
wr_list.append(node["Resources"])
return wr_list

def test_cpu_allocation(self):
for num_spark_tasks in [self.max_spark_tasks // 2, self.max_spark_tasks]:
with _init_ray_cluster(num_worker_nodes=num_spark_tasks, safe_mode=False):
worker_res_list = self.get_ray_worker_resources_list()
assert len(worker_res_list) == num_spark_tasks
for worker_res in worker_res_list:
assert worker_res["CPU"] == self.num_cpus_per_spark_task

def test_public_api(self):
try:
ray_temp_root_dir = tempfile.mkdtemp()
collect_log_to_path = tempfile.mkdtemp()
init_ray_cluster(
num_worker_nodes=self.max_spark_tasks,
safe_mode=False,
collect_log_to_path=collect_log_to_path,
ray_temp_root_dir=ray_temp_root_dir,
)

@ray.remote
def f(x):
return x * x

futures = [f.remote(i) for i in range(32)]
results = ray.get(futures)
assert results == [i * i for i in range(32)]

shutdown_ray_cluster()

time.sleep(7)
# assert temp dir is removed.
assert len(os.listdir(ray_temp_root_dir)) == 1 and os.listdir(
ray_temp_root_dir
)[0].endswith(".lock")

# assert logs are copied to specified path
listed_items = os.listdir(collect_log_to_path)
assert len(listed_items) == 1 and listed_items[0].startswith("ray-")
log_dest_dir = os.path.join(
collect_log_to_path, listed_items[0], socket.gethostname()
)
assert os.path.exists(log_dest_dir) and len(os.listdir(log_dest_dir)) > 0
finally:
if ray.util.spark.cluster_init._active_ray_cluster is not None:
# if the test raised error and does not destroy cluster,
# destroy it here.
ray.util.spark._active_ray_cluster.shutdown()
time.sleep(5)
shutil.rmtree(ray_temp_root_dir, ignore_errors=True)
shutil.rmtree(collect_log_to_path, ignore_errors=True)

def test_ray_cluster_shutdown(self):
with _init_ray_cluster(
num_worker_nodes=self.max_spark_tasks, safe_mode=False
) as cluster:
assert len(self.get_ray_worker_resources_list()) == self.max_spark_tasks

# Test: cancel background spark job will cause all ray worker nodes exit.
cluster._cancel_background_spark_job()
time.sleep(8)

assert len(self.get_ray_worker_resources_list()) == 0

time.sleep(2) # wait ray head node exit.
# assert ray head node exit by checking head port being closed.
hostname, port = cluster.address.split(":")
assert not check_port_open(hostname, int(port))

def test_background_spark_job_exit_trigger_ray_head_exit(self):
with _init_ray_cluster(
num_worker_nodes=self.max_spark_tasks, safe_mode=False
) as cluster:
# Mimic the case the job failed unexpectedly.
cluster._cancel_background_spark_job()
cluster.spark_job_is_canceled = False
time.sleep(5)

# assert ray head node exit by checking head port being closed.
hostname, port = cluster.address.split(":")
assert not check_port_open(hostname, int(port))


class TestBasicSparkCluster(RayOnSparkCPUClusterTestBase):
@classmethod
def setup_class(cls):
super().setup_class()
cls.num_total_cpus = 2
cls.num_total_gpus = 0
cls.num_cpus_per_spark_task = 1
cls.num_gpus_per_spark_task = 0
cls.max_spark_tasks = 2
os.environ["SPARK_WORKER_CORES"] = "2"
cls.spark = (
SparkSession.builder.master("local-cluster[1, 2, 1024]")
.config("spark.task.cpus", "1")
.config("spark.task.maxFailures", "1")
.getOrCreate()
)


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
Loading