Skip to content

Commit

Permalink
Remove GPU resource config.
Browse files Browse the repository at this point in the history
GPU auto scaling is a bug on Ray side. For more details, please see [this issue](ray-project/ray#20476).
  • Loading branch information
pang-wu committed Jul 29, 2022
1 parent 6125fc8 commit 81ac79a
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public static ActorHandle<RayCoarseGrainedExecutorBackend> createExecutorActor(
String executorId,
String appMasterURL,
double cores,
double gpus,
int memoryInMB,
Map<String, Double> resources,
PlacementGroup placementGroup,
Expand All @@ -51,9 +50,7 @@ public static ActorHandle<RayCoarseGrainedExecutorBackend> createExecutorActor(
creator.setJvmOptions(javaOpts);
creator.setResource("CPU", cores);
creator.setResource("memory", toMemoryUnits(memoryInMB));
if(gpus > 0d) {
creator.setResource("GPU", gpus);
}

for (Map.Entry<String, Double> entry: resources.entrySet()) {
creator.setResource(entry.getKey(), entry.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,5 @@ public class SparkOnRayConfigs {
*/
public static final String RAY_ACTOR_CPU_RESOURCE = RAY_ACTOR_RESOURCE_PREFIX + ".cpu";

/**
* GPU cores per Ray Actor which host the Spark executor, the resource is used
* for scheduling. Default value is 0.
*/
public static final String RAY_ACTOR_GPU_RESOURCE = RAY_ACTOR_RESOURCE_PREFIX + ".gpu";

public static final int DEFAULT_SPARK_CORES_PER_EXECUTOR = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ private[spark] case class ApplicationDescription(
rayActorCPU: Double,
command: Command,
user: String = System.getProperty("user.name", "<unknown>"),
resourceReqsPerExecutor: Map[String, Double] = Map.empty,
rayActorGPU: Double = 0d) {
resourceReqsPerExecutor: Map[String, Double] = Map.empty) {

def withNewCommand(newCommand: Command): ApplicationDescription = {
ApplicationDescription(name = name,
numExecutors = numExecutors, coresPerExecutor = coresPerExecutor,
memoryPerExecutorMB = memoryPerExecutorMB, command = newCommand, user = user,
resourceReqsPerExecutor = resourceReqsPerExecutor,
rayActorCPU = rayActorCPU, rayActorGPU = rayActorGPU)
rayActorCPU = rayActorCPU)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,17 @@ class RayAppMaster(host: String,
.coresPerExecutor
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = this.appInfo.desc.rayActorCPU
val rayActorGPU = this.appInfo.desc.rayActorGPU

val memory = appInfo.desc.memoryPerExecutorMB
val executorId = s"${appInfo.getNextExecutorId()}"

logInfo(s"Requesting Spark executor with Ray logical resource " +
s"{ CPU: ${rayActorCPU}, GPU: ${rayActorGPU} }..")
s"{ CPU: ${rayActorCPU} }..")
// TODO: Support generic fractional logical resources using prefix spark.ray.actor.resource.*

val handler = RayExecutorUtils.createExecutorActor(
executorId, getAppMasterEndpointUrl(),
rayActorCPU, rayActorGPU,
rayActorCPU,
memory,
// This won't work, Spark expect integer in custom resources,
// please see python test test_spark_on_fractional_custom_resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,12 @@ class RayCoarseGrainedSchedulerBackend(
.getOrElse(SparkOnRayConfigs.DEFAULT_SPARK_CORES_PER_EXECUTOR)
val rayActorCPU = conf.get(SparkOnRayConfigs.RAY_ACTOR_CPU_RESOURCE,
sparkCoresPerExecutor.toString).toDouble
val rayActorGPU = conf.get(SparkOnRayConfigs.RAY_ACTOR_GPU_RESOURCE,
0.toString).toDouble

val appDesc = ApplicationDescription(name = sc.appName, numExecutors = numExecutors,
coresPerExecutor = coresPerExecutor, memoryPerExecutorMB = sc.executorMemory,
command = command,
resourceReqsPerExecutor = resourcesInMap,
rayActorCPU = rayActorCPU, rayActorGPU = rayActorGPU)
rayActorCPU = rayActorCPU)

val rpcEnv = sc.env.rpcEnv
appMasterRef.set(rpcEnv.setupEndpoint(
Expand Down
30 changes: 1 addition & 29 deletions python/raydp/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,6 @@ def stop_all():
request.addfinalizer(stop_all)


@pytest.fixture(scope="function")
def spark_on_ray_gpu(request):
ray.shutdown()
# https://docs.ray.io/en/latest/ray-core/examples/testing-tips.html#tip-4-create-a-mini-cluster-with-ray-cluster-utils-cluster
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 2,
"num_gpus": 2
})

ray.init(address=cluster.address)

spark = raydp.init_spark(app_name="test_gpu",
num_executors=1, executor_cores=1, executor_memory="500 M",
configs={"spark.ray.actor.resource.gpu": "1",
"spark.ray.actor.resource.cpu": "1"})

def stop_all():
raydp.stop_spark()
ray.shutdown()

request.addfinalizer(stop_all)
return spark


@pytest.fixture(scope="function")
def spark_on_ray_fractional_cpu(request):
ray.shutdown()
Expand All @@ -119,9 +93,7 @@ def spark_on_ray_fractional_cpu(request):

spark = raydp.init_spark(app_name="test_cpu_fraction",
num_executors=1, executor_cores=3, executor_memory="500 M",
configs={"spark.ray.actor.resource.cpu": "0.1",
"spark.ray.actor.resource.gpu": "0", # TODO: Fix this, Spark config have stale value
})
configs={"spark.ray.actor.resource.cpu": "0.1"})

def stop_all():
raydp.stop_spark()
Expand Down
6 changes: 0 additions & 6 deletions python/raydp/tests/test_spark_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ def test_spark(spark_on_ray_small):
assert result == 10


def test_spark_on_gpu_machine(spark_on_ray_gpu):
spark = spark_on_ray_gpu
result = spark.range(0, 10).count()
assert result == 10


def test_spark_on_fractional_cpu(spark_on_ray_fractional_cpu):
spark = spark_on_ray_fractional_cpu
result = spark.range(0, 10).count()
Expand Down

0 comments on commit 81ac79a

Please sign in to comment.