Skip to content

Commit

Permalink
Support fractional resource scheduling (#258)
Browse files Browse the repository at this point in the history
* Support fractional resource scheduling

* Fix java and scala code styling.

* Fix tests.

* Use marker to skip tests

* Refactor

* Use mock clusters.

Use mock cluster based on doc here: https://docs.ray.io/en/latest/ray-core/examples/testing-tips.html#tip-4-create-a-mini-cluster-with-ray-cluster-utils-cluster

* try to fix test by running the custom resource test separately.

* Remove GPU resource config.

GPU auto scaling is a bug on Ray side. For more details, please see [this issue](ray-project/ray#20476).
  • Loading branch information
pang-wu authored Aug 1, 2022
1 parent a3c290b commit 240242d
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ jobs:
- name: Test with pytest
run: |
ray start --head --num-cpus 4
pytest python/raydp/tests/
pytest python/raydp/tests/ -m"not error_on_custom_resource"
pytest python/raydp/tests/ -m"error_on_custom_resource"
ray stop --force
- name: Test Examples
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private static double toMemoryUnits(int memoryInMB) {
public static ActorHandle<RayCoarseGrainedExecutorBackend> createExecutorActor(
String executorId,
String appMasterURL,
int cores,
double cores,
int memoryInMB,
Map<String, Double> resources,
PlacementGroup placementGroup,
Expand All @@ -48,8 +48,9 @@ public static ActorHandle<RayCoarseGrainedExecutorBackend> createExecutorActor(
ActorCreator<RayCoarseGrainedExecutorBackend> creator = Ray.actor(
RayCoarseGrainedExecutorBackend::new, executorId, appMasterURL);
creator.setJvmOptions(javaOpts);
creator.setResource("CPU", (double)cores);
creator.setResource("CPU", cores);
creator.setResource("memory", toMemoryUnits(memoryInMB));

for (Map.Entry<String, Double> entry: resources.entrySet()) {
creator.setResource(entry.getKey(), entry.getValue());
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/spark/raydp/SparkOnRayConfigs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.apache.spark.raydp;

public class SparkOnRayConfigs {
public static final String RAY_ACTOR_RESOURCE_PREFIX = "spark.ray.actor.resource";
/**
* CPU cores per Ray Actor which host the Spark executor, the resource is used
* for scheduling. Default value is 1.
* This is different from spark.executor.cores, which defines the task parallelism
* inside a stage.
*/
public static final String RAY_ACTOR_CPU_RESOURCE = RAY_ACTOR_RESOURCE_PREFIX + ".cpu";

public static final int DEFAULT_SPARK_CORES_PER_EXECUTOR = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ private[spark] case class ApplicationDescription(
numExecutors: Int,
coresPerExecutor: Option[Int],
memoryPerExecutorMB: Int,
rayActorCPU: Double,
command: Command,
user: String = System.getProperty("user.name", "<unknown>"),
resourceReqsPerExecutor: Map[String, Double] = Map.empty) {

def withNewCommand(newCommand: Command): ApplicationDescription = {
ApplicationDescription(name, numExecutors, coresPerExecutor, memoryPerExecutorMB,
newCommand, user, resourceReqsPerExecutor)
ApplicationDescription(name = name,
numExecutors = numExecutors, coresPerExecutor = coresPerExecutor,
memoryPerExecutorMB = memoryPerExecutorMB, command = newCommand, user = user,
resourceReqsPerExecutor = resourceReqsPerExecutor,
rayActorCPU = rayActorCPU)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,21 @@ package org.apache.spark.deploy.raydp

import java.io.File
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Optional}
import java.util.{Date, Locale}
import javax.xml.bind.DatatypeConverter

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import io.ray.api.{ActorHandle, PlacementGroups, Ray}
import io.ray.api.{ActorHandle, PlacementGroups}
import io.ray.api.id.PlacementGroupId
import io.ray.api.placementgroup.PlacementGroup
import io.ray.runtime.config.RayConfig

import org.apache.spark.{RayDPException, SecurityManager, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.raydp.RayExecutorUtils
import org.apache.spark.raydp.{RayExecutorUtils, SparkOnRayConfigs}
import org.apache.spark.rpc._
import org.apache.spark.util.ShutdownHookManager
import org.apache.spark.util.Utils

class RayAppMaster(host: String,
Expand Down Expand Up @@ -229,17 +228,29 @@ class RayAppMaster(host: String,
}

private def requestNewExecutor(): Unit = {
val cores = appInfo.desc.coresPerExecutor.getOrElse(1)
val sparkCoresPerExecutor = appInfo.desc
.coresPerExecutor
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = this.appInfo.desc.rayActorCPU

val memory = appInfo.desc.memoryPerExecutorMB
val executorId = s"${appInfo.getNextExecutorId()}"

logInfo(s"Requesting Spark executor with Ray logical resource " +
s"{ CPU: ${rayActorCPU} }..")
// TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.*

val handler = RayExecutorUtils.createExecutorActor(
executorId, getAppMasterEndpointUrl(), cores,
executorId, getAppMasterEndpointUrl(),
rayActorCPU,
memory,
// This won't work, Spark expect integer in custom resources,
// please see python test test_spark_on_fractional_custom_resource
appInfo.desc.resourceReqsPerExecutor.map(pair => (pair._1, Double.box(pair._2))).asJava,
placementGroup,
getNextBundleIndex,
seqAsJavaList(appInfo.desc.command.javaOpts))
appInfo.addPendingRegisterExecutor(executorId, handler, cores, memory)
appInfo.addPendingRegisterExecutor(executorId, handler, sparkCoresPerExecutor, memory)
}

private def appendActorClasspath(javaOpts: Seq[String]): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.deploy.raydp._
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
import org.apache.spark.raydp.SparkOnRayConfigs
import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
import org.apache.spark.rpc.{RpcEndpointAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler.TaskSchedulerImpl
Expand Down Expand Up @@ -145,8 +146,17 @@ class RayCoarseGrainedSchedulerBackend(
conf, config.SPARK_EXECUTOR_PREFIX)
val resourcesInMap = transferResourceRequirements(executorResourceReqs)
val numExecutors = conf.get(config.EXECUTOR_INSTANCES).get
val appDesc = ApplicationDescription(sc.appName, numExecutors, coresPerExecutor,
sc.executorMemory, command, resourceReqsPerExecutor = resourcesInMap)
val sparkCoresPerExecutor = coresPerExecutor
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE,
sparkCoresPerExecutor.toString).toDouble

val appDesc = ApplicationDescription(name = sc.appName, numExecutors = numExecutors,
coresPerExecutor = coresPerExecutor, memoryPerExecutorMB = sc.executorMemory,
command = command,
resourceReqsPerExecutor = resourcesInMap,
rayActorCPU = rayActorCPU)

val rpcEnv = sc.env.rpcEnv
appMasterRef.set(rpcEnv.setupEndpoint(
"AppMasterClient",
Expand Down
4 changes: 4 additions & 0 deletions python/raydp/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ def stop(self, del_obj_holder=True):
ray.util.remove_placement_group(self._placement_group)
self._placement_group = None
self._placement_group_strategy = None
if self._configs is not None:
self._configs = None

def __enter__(self):
self.get_or_create_session()
Expand Down Expand Up @@ -199,6 +201,8 @@ def init_spark(app_name: str,
configs)
return _global_spark_context.get_or_create_session()
except:
if _global_spark_context is not None:
_global_spark_context.stop()
_global_spark_context = None
raise
else:
Expand Down
41 changes: 41 additions & 0 deletions python/raydp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import ray
import raydp
import subprocess
from ray.cluster_utils import Cluster


def quiet_logger():
Expand Down Expand Up @@ -62,6 +63,46 @@ def stop_all():
return spark


@pytest.fixture(scope="function")
def spark_on_ray_fraction_custom_resource(request):
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})
ray.init(address=cluster.address)

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)


@pytest.fixture(scope="function")
def spark_on_ray_fractional_cpu(request):
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})

ray.init(address=cluster.address)

spark = raydp.init_spark(app_name="test_cpu_fraction",
num_executors=1, executor_cores=3, executor_memory="500 M",
configs={"spark.ray.actor.resource.cpu": "0.1"})

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope='session')
def custom_spark_dir(tmp_path_factory) -> str:
working_dir = tmp_path_factory.mktemp("spark").as_posix()
Expand Down
29 changes: 28 additions & 1 deletion python/raydp/tests/test_spark_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import raydp
import raydp.utils as utils
from raydp.spark.ray_cluster_master import RayDPSparkMaster, RAYDP_SPARK_MASTER_NAME
from ray.cluster_utils import Cluster


def test_spark(spark_on_ray_small):
Expand All @@ -37,6 +38,26 @@ def test_spark(spark_on_ray_small):
assert result == 10


def test_spark_on_fractional_cpu(spark_on_ray_fractional_cpu):
spark = spark_on_ray_fractional_cpu
result = spark.range(0, 10).count()
assert result == 10


@pytest.mark.error_on_custom_resource
def test_spark_on_fractional_custom_resource(spark_on_ray_fraction_custom_resource):
try:
spark = raydp.init_spark(app_name="test_custom_resource_fraction",
num_executors=1, executor_cores=3, executor_memory="500 M",
configs={"spark.executor.resource.CUSTOM.amount": "0.1"})
spark.range(0, 10).count()
except Exception:
assert True
return

assert False


def test_spark_remote(ray_cluster):
@ray.remote
class SparkRemote:
Expand Down Expand Up @@ -142,7 +163,13 @@ def test_placement_group(ray_cluster):

def test_custom_installed_spark(custom_spark_dir):
os.environ["SPARK_HOME"] = custom_spark_dir
ray.shutdown()
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2
})

ray.init(address=cluster.address)
spark = raydp.init_spark("custom_install_test", 1, 1, "500 M")
spark_master_actor = ray.get_actor(name=RAYDP_SPARK_MASTER_NAME)
spark_home = ray.get(spark_master_actor.get_spark_home.remote())
Expand Down

0 comments on commit 240242d

Please sign in to comment.