Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Cancel lease requests before returning a PG bundle #45919

Merged
merged 9 commits into from
Jun 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import asyncio
import os
import threading
from time import sleep
Expand All @@ -22,6 +23,8 @@
)
from ray.job_submission import JobSubmissionClient, JobStatus
from ray._raylet import GcsClient
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray.util.state import list_placement_groups

import psutil

Expand Down Expand Up @@ -1196,6 +1199,87 @@ def spawn(self, name, namespace):
raise ValueError(f"Unknown case: {case}")


MyPlugin = "MyPlugin"
MY_PLUGIN_CLASS_PATH = "ray.tests.test_gcs_fault_tolerance.HangPlugin"


class HangPlugin(RuntimeEnvPlugin):
name = MyPlugin

async def create(
self,
uri,
runtime_env,
ctx,
logger, # noqa: F821
) -> float:
while True:
await asyncio.sleep(1)

@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
"ray_start_regular_with_external_redis",
[
generate_system_config_map(
gcs_rpc_server_reconnect_timeout_s=60,
testing_asio_delay_us="NodeManagerService.grpc_server.CancelResourceReserve=500000000:500000000", # noqa: E501
),
],
indirect=True,
)
@pytest.mark.parametrize(
"set_runtime_env_plugins",
[
'[{"class":"' + MY_PLUGIN_CLASS_PATH + '"}]',
],
indirect=True,
)
def test_placement_group_removal_after_gcs_restarts(
set_runtime_env_plugins, ray_start_regular_with_external_redis
):
@ray.remote
def task():
pass

pg = ray.util.placement_group(bundles=[{"CPU": 1}])
_ = task.options(
max_retries=0,
num_cpus=1,
scheduling_strategy=PlacementGroupSchedulingStrategy(
placement_group=pg,
),
runtime_env={
MyPlugin: {"name": "f2"},
"config": {"setup_timeout_seconds": -1},
},
).remote()

# The task should be popping worker
# TODO(jjyao) Use a more determinstic way to
# decide whether the task is popping worker
sleep(5)

ray.util.remove_placement_group(pg)
# The PG is marked as REMOVED in redis but not removed yet from raylet
# due to the injected delay of CancelResourceReserve rpc
wait_for_condition(lambda: list_placement_groups()[0].state == "REMOVED")

ray._private.worker._global_node.kill_gcs_server()
# After GCS restarts, it will try to remove the PG resources
# again via ReleaseUnusedBundles rpc
ray._private.worker._global_node.start_gcs_server()

def verify_pg_resources_cleaned():
r_keys = ray.available_resources().keys()
return all("group" not in k for k in r_keys)

wait_for_condition(verify_pg_resources_cleaned, timeout=30)


if __name__ == "__main__":

import pytest
Expand Down
7 changes: 3 additions & 4 deletions python/ray/tests/test_placement_group_5.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,9 @@ async def create(
) -> float:
await asyncio.sleep(PLUGIN_TIMEOUT)


@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1
@staticmethod
def validate(runtime_env_dict: dict) -> str:
return 1


@pytest.mark.parametrize(
Expand Down
1 change: 1 addition & 0 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() {
thread.join();

RayConfig::instance().initialize(promise.get_future().get());
ray::asio::testing::init();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sure we can set testing_asio_delay_us env var through _system_configs

}

void CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop() {
Expand Down
39 changes: 15 additions & 24 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
auto node_id = NodeID::FromBinary(node.value()->node_id());

if (max_retry == current_retry_cnt) {
RAY_LOG(INFO) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
RAY_LOG(ERROR) << "Failed to cancel resource reserved for bundle because the max "
"retry count is reached. "
<< bundle_spec->DebugString() << " at node " << node_id;
return;
}

Expand All @@ -261,11 +261,10 @@ void GcsPlacementGroupScheduler::CancelResourceReserve(
RAY_LOG(INFO) << "Finished cancelling the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id;
} else {
// We couldn't delete the pg resources either becuase it is in use
// or network issue. Retry.
RAY_LOG(INFO) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
// We couldn't delete the pg resources because of network issue. Retry.
RAY_LOG(WARNING) << "Failed to cancel the resource reserved for bundle: "
<< bundle_spec->DebugString() << " at node " << node_id
<< ". Status: " << status;
execute_after(
io_context_,
[this, bundle_spec, node, max_retry, current_retry_cnt] {
Expand Down Expand Up @@ -568,14 +567,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupPreparedBundleResources(
for (const auto &iter : *(leasing_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
}
}
Expand All @@ -594,14 +589,10 @@ void GcsPlacementGroupScheduler::DestroyPlacementGroupCommittedBundleResources(
for (const auto &iter : *(committed_bundle_locations)) {
auto &bundle_spec = iter.second.second;
auto &node_id = iter.second.first;
CancelResourceReserve(
bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
// Retry 10 * worker registeration timeout to avoid race condition.
// See https://github.com/ray-project/ray/pull/42942
// for more details.
/*max_retry*/ RayConfig::instance().worker_register_timeout_seconds() * 10,
/*num_retry*/ 0);
CancelResourceReserve(bundle_spec,
gcs_node_manager_.GetAliveNode(node_id),
/*max_retry*/ 5,
/*num_retry*/ 0);
}
committed_bundle_location_index_.Erase(placement_group_id);
cluster_resource_scheduler_.GetClusterResourceManager()
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ int main(int argc, char *argv[]) {
gflags::ShutDownCommandLineFlags();

RayConfig::instance().initialize(config_list);
ray::asio::testing::init();

// IO Service for main loop.
instrumented_io_context main_service;
Expand Down
118 changes: 56 additions & 62 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -546,24 +546,6 @@ bool LocalTaskManager::PoppedWorkerHandler(
not_detached_with_owner_failed = true;
}

const auto &required_resource =
task.GetTaskSpecification().GetRequiredResources().GetResourceMap();
for (auto &entry : required_resource) {
if (!cluster_resource_scheduler_->GetLocalResourceManager().ResourcesExist(
scheduling::ResourceID(entry.first))) {
jjyao marked this conversation as resolved.
Show resolved Hide resolved
RAY_CHECK(task.GetTaskSpecification().PlacementGroupBundleId().first !=
PlacementGroupID::Nil());
RAY_LOG(DEBUG) << "The placement group: "
<< task.GetTaskSpecification().PlacementGroupBundleId().first
<< " was removed when poping workers for task: " << task_id
<< ", will cancel the task.";
CancelTask(
task_id,
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_PLACEMENT_GROUP_REMOVED);
canceled = true;
}
}

auto erase_from_dispatch_queue_fn = [this](const std::shared_ptr<internal::Work> &work,
const SchedulingClass &scheduling_class) {
auto shapes_it = tasks_to_dispatch_.find(scheduling_class);
Expand Down Expand Up @@ -855,7 +837,7 @@ void LocalTaskManager::ReleaseTaskArgs(const TaskID &task_id) {
}

namespace {
void ReplyCancelled(std::shared_ptr<internal::Work> &work,
void ReplyCancelled(const std::shared_ptr<internal::Work> &work,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
auto reply = work->reply;
Expand All @@ -867,55 +849,67 @@ void ReplyCancelled(std::shared_ptr<internal::Work> &work,
}
} // namespace

bool LocalTaskManager::CancelTask(
const TaskID &task_id,
bool LocalTaskManager::CancelTasks(
jjyao marked this conversation as resolved.
Show resolved Hide resolved
std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
for (auto shapes_it = tasks_to_dispatch_.begin(); shapes_it != tasks_to_dispatch_.end();
shapes_it++) {
auto &work_queue = shapes_it->second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) {
const auto &task = (*work_it)->task;
if (task.GetTaskSpecification().TaskId() == task_id) {
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(*work_it, failure_type, scheduling_failure_message);
if ((*work_it)->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
(*work_it)->allocated_instances);
// Release pinned task args.
ReleaseTaskArgs(task_id);
}
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
bool tasks_cancelled = false;

ray::erase_if<SchedulingClass, std::shared_ptr<internal::Work>>(
tasks_to_dispatch_, [&](const std::shared_ptr<internal::Work> &work) {
if (predicate(work)) {
const TaskID task_id = work->task.GetTaskSpecification().TaskId();
RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue.";
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (work->GetState() == internal::WorkStatus::WAITING_FOR_WORKER) {
// We've already acquired resources so we need to release them.
cluster_resource_scheduler_->GetLocalResourceManager().ReleaseWorkerResources(
work->allocated_instances);
// Release pinned task args.
ReleaseTaskArgs(task_id);
}
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
RemoveFromRunningTasksIfExists(work->task);
work->SetStateCancelled();
tasks_cancelled = true;
return true;
} else {
return false;
}
RemoveFromRunningTasksIfExists(task);
(*work_it)->SetStateCancelled();
work_queue.erase(work_it);
if (work_queue.empty()) {
tasks_to_dispatch_.erase(shapes_it);
});

ray::erase_if<std::shared_ptr<internal::Work>>(
waiting_task_queue_, [&](const std::shared_ptr<internal::Work> &work) {
if (predicate(work)) {
ReplyCancelled(work, failure_type, scheduling_failure_message);
if (!work->task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
work->task.GetTaskSpecification().TaskId());
}
waiting_tasks_index_.erase(work->task.GetTaskSpecification().TaskId());
tasks_cancelled = true;
return true;
} else {
return false;
}
return true;
}
}
}

auto iter = waiting_tasks_index_.find(task_id);
if (iter != waiting_tasks_index_.end()) {
const auto &task = (*iter->second)->task;
ReplyCancelled(*iter->second, failure_type, scheduling_failure_message);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
waiting_task_queue_.erase(iter->second);
waiting_tasks_index_.erase(iter);
});

return true;
}
return tasks_cancelled;
}

return false;
bool LocalTaskManager::CancelTask(
const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) {
return CancelTasks(
[task_id](const std::shared_ptr<internal::Work> &work) {
return work->task.GetTaskSpecification().TaskId() == task_id;
},
failure_type,
scheduling_failure_message);
}

bool LocalTaskManager::AnyPendingTasksForResourceAcquisition(
Expand Down
30 changes: 20 additions & 10 deletions src/ray/raylet/local_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,15 @@ class LocalTaskManager : public ILocalTaskManager {
/// \param task: Output parameter.
void TaskFinished(std::shared_ptr<WorkerInterface> worker, RayTask *task);

/// Attempt to cancel an already queued task.
/// Attempt to cancel all queued tasks that match the predicate.
///
/// \param task_id: The id of the task to remove.
/// \param failure_type: The failure type.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "") override;
/// \param predicate: A function that returns true if a task needs to be cancelled.
/// \param failure_type: The reason for cancellation.
/// \param scheduling_failure_message: The reason message for cancellation.
/// \return True if any task was successfully cancelled.
bool CancelTasks(std::function<bool(const std::shared_ptr<internal::Work> &)> predicate,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type,
const std::string &scheduling_failure_message) override;

/// Return if any tasks are pending resource acquisition.
///
Expand Down Expand Up @@ -203,6 +201,18 @@ class LocalTaskManager : public ILocalTaskManager {
const rpc::Address &owner_address,
const std::string &runtime_env_setup_error_message);

/// Attempt to cancel an already queued task.
///
/// \param task_id: The id of the task to remove.
/// \param failure_type: The failure type.
///
/// \return True if task was successfully removed. This function will return
/// false if the task is already running.
bool CancelTask(const TaskID &task_id,
rpc::RequestWorkerLeaseReply::SchedulingFailureType failure_type =
rpc::RequestWorkerLeaseReply::SCHEDULING_CANCELLED_INTENDED,
const std::string &scheduling_failure_message = "");

/// Attempts to dispatch all tasks which are ready to run. A task
/// will be dispatched if it is on `tasks_to_dispatch_` and there are still
/// available resources on the node.
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ int main(int argc, char *argv[]) {
RAY_CHECK_OK(status);
RAY_CHECK(stored_raylet_config.has_value());
RayConfig::instance().initialize(stored_raylet_config.get());
ray::asio::testing::init();

// Core worker tries to kill child processes when it exits. But they can't do
// it perfectly: if the core worker is killed by SIGKILL, the child processes
Expand Down
Loading
Loading