Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix GPU first scheduling that is not working with placement group #19141

Merged
merged 11 commits into from
Oct 11, 2021
1 change: 1 addition & 0 deletions python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ def _live_node_ids(self):

def _available_resources_per_node(self):
"""Returns a dictionary mapping node id to avaiable resources."""
self._check_connected()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a bug. I can remove it if you don't like it to be included here

available_resources_by_id = {}

all_available_resources = \
Expand Down
59 changes: 57 additions & 2 deletions python/ray/tests/test_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,8 @@ def __init__(self):
def get_location(self):
return ray.worker.global_worker.node.unique_id

@ray.remote
def task_cpu(num_cpus=0.5):
@ray.remote(num_cpus=0.5)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another. bug in tests

def task_cpu():
time.sleep(10)
return ray.worker.global_worker.node.unique_id

Expand Down Expand Up @@ -618,6 +618,61 @@ def g():
time.sleep(1)


@pytest.mark.skipif(sys.platform == "win32", reason="Fails on windows")
def test_gpu_scheduling_liveness(ray_start_cluster):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified this doesn't work when the change wasn't added

"""Check if the GPU scheduling is in progress when
it is used with the placement group
Issue: https://github.com/ray-project/ray/issues/19130
"""
cluster = ray_start_cluster
# Start a node without a gpu.
cluster.add_node(num_cpus=6)
ray.init(address=cluster.address)

NUM_CPU_BUNDLES = 10

@ray.remote(num_cpus=1)
class Worker(object):
def __init__(self, i):
self.i = i

def work(self):
time.sleep(0.1)
print("work ", self.i)

@ray.remote(num_cpus=1, num_gpus=1)
class Trainer(object):
def __init__(self, i):
self.i = i

def train(self):
time.sleep(0.2)
print("train ", self.i)

bundles = [{"CPU": 1, "GPU": 1}]
bundles += [{"CPU": 1} for _ in range(NUM_CPU_BUNDLES)]

pg = ray.util.placement_group(bundles, strategy="PACK")
o = pg.ready()
# Artificial delay to simulate the real world workload.
time.sleep(3)
print("Scaling up.")
cluster.add_node(num_cpus=6, num_gpus=1)
ray.get(o)

workers = [
Worker.options(placement_group=pg).remote(i)
for i in range(NUM_CPU_BUNDLES)
]
trainer = Trainer.options(placement_group=pg).remote(0)

# If the gpu scheduling doesn't properly work, the below
# code will hang.
ray.get(
[workers[i].work.remote() for i in range(NUM_CPU_BUNDLES)], timeout=30)
ray.get(trainer.train.remote(), timeout=30)


if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
2 changes: 1 addition & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ RAY_CONFIG(bool, actor_register_async, true)
RAY_CONFIG(std::string, event_level, "warning")

/// Whether to avoid scheduling cpu requests on gpu nodes
RAY_CONFIG(bool, scheduler_avoid_gpu_nodes, false)
RAY_CONFIG(bool, scheduler_avoid_gpu_nodes, true)

/// Whether to skip running local GC in runtime env.
RAY_CONFIG(bool, runtime_env_skip_local_gc, false)
20 changes: 11 additions & 9 deletions src/ray/raylet/scheduling/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ int64_t HybridPolicyWithFilter(const ResourceRequest &resource_request,
if (node_filter == NodeFilter::kGPU) {
return has_gpu;
}
RAY_CHECK(node_filter == NodeFilter::kCPUOnly);
RAY_CHECK(node_filter == NodeFilter::kNonGpu);
return !has_gpu;
};

Expand Down Expand Up @@ -149,16 +149,18 @@ int64_t HybridPolicy(const ResourceRequest &resource_request, const int64_t loca
spread_threshold, force_spillback, require_available);
}

// Try schedule on CPU-only nodes.
const auto node_id =
HybridPolicyWithFilter(resource_request, local_node_id, nodes, spread_threshold,
force_spillback, require_available, NodeFilter::kCPUOnly);
if (node_id != -1) {
return node_id;
// Try schedule on non-GPU nodes.
auto best_node_id = HybridPolicyWithFilter(
resource_request, local_node_id, nodes, spread_threshold, force_spillback,
/*require_available*/ true, NodeFilter::kNonGpu);
if (best_node_id != -1) {
return best_node_id;
}
// Could not schedule on CPU-only nodes, schedule on GPU nodes as a last resort.

// If we cannot find any available node from non-gpu nodes, fallback to the original
// scheduling
return HybridPolicyWithFilter(resource_request, local_node_id, nodes, spread_threshold,
force_spillback, require_available, NodeFilter::kGPU);
force_spillback, require_available);
}

} // namespace raylet_scheduling_policy
Expand Down
13 changes: 10 additions & 3 deletions src/ray/raylet/scheduling/scheduling_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,15 @@ int64_t HybridPolicy(
bool force_spillback, bool require_available,
bool scheduler_avoid_gpu_nodes = RayConfig::instance().scheduler_avoid_gpu_nodes());

//
enum class NodeFilter { kAny, kGPU, kCPUOnly };
enum class NodeFilter {
/// Default scheduling.
kAny,
/// Schedule on GPU only nodes.
kGPU,
/// Schedule on nodes that don't have GPU. Since GPUs are more scarce resources, we need
/// special handling for this.
kNonGpu
};

/// \param resource_request: The resource request we're attempting to schedule.
/// \param local_node_id: The id of the local node, which is needed for traversal order.
Expand All @@ -72,7 +79,7 @@ enum class NodeFilter { kAny, kGPU, kCPUOnly };
/// truncated to 0.
/// \param node_filter: defines the subset of nodes were are allowed to schedule on.
/// can be one of kAny (can schedule on all nodes), kGPU (can only schedule on kGPU
/// nodes), kCPUOnly (can only schedule on non-GPU nodes.
/// nodes), kNonGpu (can only schedule on non-GPU nodes.
///
/// \return -1 if the task is unfeasible, otherwise the node id (key in `nodes`) to
/// schedule on.
Expand Down
36 changes: 36 additions & 0 deletions src/ray/raylet/scheduling/scheduling_policy_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,42 @@ TEST_F(SchedulingPolicyTest, ForceSpillbackOnlyFeasibleLocallyTest) {
ASSERT_EQ(to_schedule, -1);
}

TEST_F(SchedulingPolicyTest, NonGpuNodePreferredSchedulingTest) {
// Prefer to schedule on CPU nodes first.
// GPU nodes should be preferred as a last resort.
StringIdMap map;
int64_t local_node = 0;
int64_t remote_node_1 = 1;
int64_t remote_node_2 = 2;

// local {CPU:2, GPU:1}
// Remote {CPU: 2}
absl::flat_hash_map<int64_t, Node> nodes;
nodes.emplace(local_node, CreateNodeResources(2, 2, 0, 0, 1, 1));
nodes.emplace(remote_node_1, CreateNodeResources(2, 2, 0, 0, 0, 0));
nodes.emplace(remote_node_2, CreateNodeResources(3, 3, 0, 0, 0, 0));

ResourceRequest req = ResourceMapToResourceRequest(map, {{"CPU", 1}}, false);
int to_schedule = raylet_scheduling_policy::HybridPolicy(
req, local_node, nodes, 0.51, false, true, /*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_1);

req = ResourceMapToResourceRequest(map, {{"CPU", 3}}, false);
to_schedule = raylet_scheduling_policy::HybridPolicy(
req, local_node, nodes, 0.51, false, true, /*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_2);

req = ResourceMapToResourceRequest(map, {{"CPU", 1}, {"GPU", 1}}, false);
to_schedule = raylet_scheduling_policy::HybridPolicy(
req, local_node, nodes, 0.51, false, true, /*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, local_node);

req = ResourceMapToResourceRequest(map, {{"CPU", 2}}, false);
to_schedule = raylet_scheduling_policy::HybridPolicy(
req, local_node, nodes, 0.51, false, true, /*gpu_avoid_scheduling*/ true);
ASSERT_EQ(to_schedule, remote_node_1);
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
Expand Down