Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Allow spillback while hitting scheduling class cap #28804

Merged
merged 7 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
int64_t target_time = get_time_ms_() + wait_time;
sched_cls_info.next_update_time =
std::min(target_time, sched_cls_info.next_update_time);

// While we're over capacity and cannot run the task,
// try to spill to a node that can run it.
bool did_spill = TrySpillback(work, is_infeasible);
if (did_spill) {
work_it = dispatch_queue.erase(work_it);
continue;
}

break;
}
}
Expand Down Expand Up @@ -228,11 +237,6 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
internal::UnscheduledWorkCause::WAITING_FOR_RESOURCES_AVAILABLE);
break;
}
num_unschedulable_task_spilled_++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we move this code?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved into TrySpillback() since this needs to be called whenever spill happens. By moving it inside TrySpillback() we make sure it always happens when spill also we avoid the code duplication since TrySpillback() now is called in two places.

if (!spec.GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
work_it = dispatch_queue.erase(work_it);
} else {
// Force us to recalculate the next update time the next time a task
Expand Down Expand Up @@ -370,7 +374,7 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,
bool &is_infeasible) {
auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode(
work->task.GetTaskSpecification(),
work->PrioritizeLocalNode(),
/*prioritize_local_node*/ true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this change needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, when a node is selected by the cluster task manager and the task is added to that node's local task manager queue. Then we should only spillback if local has no resource (in the other word, we should prefer staying local if possible).

/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false,
&is_infeasible);
Expand All @@ -382,6 +386,11 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,

NodeID node_id = NodeID::FromBinary(scheduling_node_id.Binary());
Spillback(node_id, work);
num_unschedulable_task_spilled_++;
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
return true;
}

Expand Down
35 changes: 35 additions & 0 deletions src/ray/raylet/scheduling/cluster_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2102,6 +2102,41 @@ TEST_F(ClusterTaskManagerTestWithoutCPUsAtHead, ZeroCPUNode) {
AssertNoLeaks();
}

/// Test that we are able to spillback tasks
/// while hitting the scheduling class cap.
TEST_F(ClusterTaskManagerTest, SchedulingClassCapSpillback) {
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));

std::vector<RayTask> tasks;
std::vector<std::unique_ptr<rpc::RequestWorkerLeaseReply>> replies;
int num_callbacks = 0;
auto callback = [&](Status, std::function<void()>, std::function<void()>) {
num_callbacks++;
};
// The first task will be dispatched right away,
// and the second task will hit the scheduling class cap.
for (int i = 0; i < 2; ++i) {
RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}});
tasks.push_back(task);
replies.push_back(std::make_unique<rpc::RequestWorkerLeaseReply>());
task_manager_.QueueAndScheduleTask(task, false, false, replies[i].get(), callback);
pool_.TriggerCallbacks();
}

ASSERT_EQ(replies[0]->worker_address().port(), 1234);
ASSERT_EQ(num_callbacks, 1);
ASSERT_EQ(NumTasksToDispatchWithStatus(internal::WorkStatus::WAITING), 1);

// A new node is added so we should be able to spillback to it.
auto remote_node_id = NodeID::FromRandom();
AddNode(remote_node_id, 8);
task_manager_.ScheduleAndDispatchTasks();
ASSERT_EQ(num_callbacks, 2);
ASSERT_EQ(replies[1]->retry_at_raylet_address().raylet_id(), remote_node_id.Binary());
}

/// Test that we exponentially increase the amount of time it takes to increase
/// the dispatch cap for a scheduling class.
TEST_F(ClusterTaskManagerTest, SchedulingClassCapIncrease) {
Expand Down