From 81c3a2cf04660c7902f960fb36ebf4360839c287 Mon Sep 17 00:00:00 2001 From: khu Date: Wed, 30 Dec 2020 23:07:51 -0800 Subject: [PATCH 01/15] [Placement Group] Support RayPlacementGroupError #10508 --- .../ray/runtime/object/ObjectSerializer.java | 2 + python/ray/_raylet.pyx | 1 + python/ray/exceptions.py | 8 ++ python/ray/ray_constants.py | 3 +- python/ray/serialization.py | 3 + .../gcs_server/test/gcs_actor_manager_test.cc | 6 +- src/ray/protobuf/common.proto | 14 +++ src/ray/raylet/node_manager.cc | 94 +++++++++++++------ src/ray/raylet/node_manager.h | 19 +++- src/ray/raylet/worker_pool.cc | 2 +- 10 files changed, 112 insertions(+), 40 deletions(-) diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java index 76576b969e20..cef2eea7c48a 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectSerializer.java @@ -33,6 +33,8 @@ public class ObjectSerializer { String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes(); private static final byte[] TASK_EXECUTION_EXCEPTION_META = String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes(); + private static final byte[] PLACEMENT_GROUP_ERROR_META = + String.valueOf(ErrorType.PLACEMENT_GROUP_ERROR.getNumber()).getBytes(); public static final byte[] OBJECT_METADATA_TYPE_CROSS_LANGUAGE = "XLANG".getBytes(); public static final byte[] OBJECT_METADATA_TYPE_JAVA = "JAVA".getBytes(); diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1360c96ce280..d764b3ef76f5 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -102,6 +102,7 @@ from ray.exceptions import ( RayError, RaySystemError, RayTaskError, + RayPlacementGroupError, ObjectStoreFullError, GetTimeoutError, TaskCancelledError diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index 56e943db6c64..1275dc9cb394 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -138,6 +138,13 @@ def __str__(self): return "\n".join(out) +class RayPlacementGroupError(RayError): + """Indicates that the task failed due to placement group errors.""" + + def __str__(self): + return "The task failed due to placement group error." + + class WorkerCrashedError(RayError): """Indicates that the worker died unexpectedly while executing a task.""" @@ -214,6 +221,7 @@ class PlasmaObjectNotAvailable(RayError): PlasmaObjectNotAvailable, RayError, RayTaskError, + RayPlacementGroupError, WorkerCrashedError, RayActorError, ObjectStoreFullError, diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 30b3b5c7b394..9f1feb716e07 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -129,7 +129,8 @@ def to_memory_units(memory_bytes, round_up): REPORTER_DIED_ERROR = "reporter_died" DASHBOARD_AGENT_DIED_ERROR = "dashboard_agent_died" DASHBOARD_DIED_ERROR = "dashboard_died" -RAYLET_CONNECTION_ERROR = "raylet_connection_error" +RAYLET_CONNECTION_ERROR = "raylet_connection" +PLACEMENT_GROUP_PUSH_ERROR = "placement_group" # Used in gpu detection RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:" diff --git a/python/ray/serialization.py b/python/ray/serialization.py index 9a24f3cccc0a..8ac330c36df7 100644 --- a/python/ray/serialization.py +++ b/python/ray/serialization.py @@ -12,6 +12,7 @@ RayError, PlasmaObjectNotAvailable, RayTaskError, + RayPlacementGroupError, RayActorError, TaskCancelledError, WorkerCrashedError, @@ -278,6 +279,8 @@ def _deserialize_object(self, data, metadata, object_ref): return TaskCancelledError() elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"): return ObjectLostError(ray.ObjectRef(object_ref.binary())) + elif error_type == ErrorType.Value("PLACEMENT_GROUP_ERROR"): + return RayPlacementGroupError() else: assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \ "Tried to get object that has been promoted to plasma." diff --git a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc index de3fc8fb6fa4..735c6c596600 100644 --- a/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc @@ -725,7 +725,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) { actor->UpdateAddress(address); const auto actor_id = actor->GetActorID(); EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id)); - gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false); + gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id); } TEST_F(GcsActorManagerTest, TestRegisterActor) { @@ -848,10 +848,10 @@ TEST_F(GcsActorManagerTest, TestOwnerAndChildDiedAtTheSameTimeRaceCondition) { const auto child_worker_id = actor->GetWorkerID(); const auto actor_id = actor->GetActorID(); // Make worker & owner fail at the same time, but owner's failure comes first. - gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false); + gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id); EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(child_node_id, child_worker_id)) .WillOnce(Return(actor_id)); - gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id, false); + gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id); } } // namespace ray diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index d9337708d873..4786eb549dbb 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -133,6 +133,8 @@ enum ErrorType { TASK_CANCELLED = 5; // Inidicates that creating the GCS service failed to create the actor. ACTOR_CREATION_FAILED = 6; + // Indicates failure due to placement group + PLACEMENT_GROUP_ERROR = 7; } /// The task exception encapsulates all information about task @@ -442,3 +444,15 @@ message MetricPoint { // [Optional] Unit of the metric. string units = 6; } + +// Type of a worker exit. +enum ClientDisconnectType { + // Unintended worker exit. + UNEXPECTED_EXIT = 0; + // Intended worker exit. + FINISHED = 1; + // Worker exit due to resource bundle release. + UNUSED_RESOURCE_RELEASED = 2; + // Worker exit due to placement group removal. + PLACEGROUP_REMOVED = 3; +} diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6fba62e436f4..af6208001bf5 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -347,11 +347,15 @@ void NodeManager::KillWorker(std::shared_ptr worker) { }); } -void NodeManager::DestroyWorker(std::shared_ptr worker) { +void NodeManager::DisconnectAndKillWorker(std::shared_ptr worker, + rpc::ClientDisconnectType disconnect_type) { + // Used to destroy a worker when its bundle resource is released (unused or + // placementgroup is deleted.) // We should disconnect the client first. Otherwise, we'll remove bundle resources // before actual resources are returned. Subsequent disconnect request that comes // due to worker dead will be ignored. - ProcessDisconnectClientMessage(worker->Connection(), /* intentional exit */ true); + DisconnectClient(worker->Connection(), disconnect_type); + RAY_LOG(ERROR) << "DisconnectAndKillWorker" << disconnect_type; worker->MarkDead(); KillWorker(worker); } @@ -602,9 +606,9 @@ void NodeManager::HandleReleaseUnusedBundles( } // Kill all workers that are currently associated with the unused bundles. - // NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will - // delete the element of `leased_workers_`. So we need to filter out - // `workers_associated_with_unused_bundles` separately. + // NOTE: We can't traverse directly with `leased_workers_`, because + // `DisconnectAndKillWorker` will delete the element of `leased_workers_`. So we need to + // filter out `workers_associated_with_unused_bundles` separately. std::vector> workers_associated_with_unused_bundles; for (const auto &worker_it : leased_workers_) { auto &worker = worker_it.second; @@ -623,7 +627,7 @@ void NodeManager::HandleReleaseUnusedBundles( << ", task id: " << worker->GetAssignedTaskId() << ", actor id: " << worker->GetActorId() << ", worker id: " << worker->WorkerId(); - DestroyWorker(worker); + DisconnectAndKillWorker(worker, rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED); } // Return unused bundle resources. @@ -709,7 +713,7 @@ void NodeManager::WarnResourceDeadlock() { << "To resolve the issue, consider creating fewer actors or increase the " << "resources available to this Ray cluster. You can ignore this message " << "if this Ray cluster is expected to auto-scale."; - auto error_data_ptr = gcs::CreateErrorTableData( + const auto error_data_ptr = gcs::CreateErrorTableData( "resource_deadlock", error_message.str(), current_time_ms(), exemplar.GetTaskSpecification().JobId()); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); @@ -1255,7 +1259,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( static_cast(protocol::MessageType::RegisterClientReply), fbb.GetSize(), fbb.GetBufferPointer(), [this, client](const ray::Status &status) { if (!status.ok()) { - ProcessDisconnectClientMessage(client); + DisconnectClient(client, rpc::ClientDisconnectType::UNEXPECTED_EXIT); } }); }; @@ -1354,8 +1358,9 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & } } -void NodeManager::ProcessDisconnectClientMessage( - const std::shared_ptr &client, bool intentional_disconnect) { +void NodeManager::DisconnectClient(const std::shared_ptr &client, + rpc::ClientDisconnectType disconnect_type) { + RAY_LOG(ERROR) << "DisconnectClient*****" << disconnect_type; std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); bool is_worker = false, is_driver = false; if (worker) { @@ -1378,7 +1383,6 @@ void NodeManager::ProcessDisconnectClientMessage( if (is_worker && worker->IsDead()) { // If the worker was killed by us because the driver exited, // treat it as intentionally disconnected. - intentional_disconnect = true; // Don't need to unblock the client if it's a worker and is already dead. // Because in this case, its task is already cleaned up. RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead."; @@ -1400,32 +1404,50 @@ void NodeManager::ProcessDisconnectClientMessage( // Publish the worker failure. auto worker_failure_data_ptr = gcs::CreateWorkerFailureData(self_node_id_, worker->WorkerId(), worker->IpAddress(), - worker->Port(), time(nullptr), intentional_disconnect); + worker->Port(), time(nullptr), disconnect_type); RAY_CHECK_OK( gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); if (is_worker) { const ActorID &actor_id = worker->GetActorId(); const TaskID &task_id = worker->GetAssignedTaskId(); - // If the worker was running a task or actor, clean up the task and push an - // error to the driver, unless the worker is already dead. + if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) { + // If the worker was running a task or actor, clean up the task and push an + // error to the driver, unless the worker is already dead. + ErrorType error_type = ErrorType::WORKER_DIED; + // TODO(rkn): Define this constant somewhere else. + std::string type_str; + std::ostringstream error_message; + + bool intentional_exit = false; + switch (disconnect_type) { + case rpc::ClientDisconnectType::UNEXPECTED_EXIT: { + type_str = "worker_died"; + error_message << "A worker died or was killed while executing task " << task_id + << "."; + } break; + case rpc::ClientDisconnectType::PLACEGROUP_REMOVED: { + error_type = ErrorType::PLACEMENT_GROUP_ERROR; + type_str = "placement_group"; + error_message << "A worker was killed while executing task " << task_id + << " due to placement group removal" + << "."; + } break; + default: + intentional_exit = true; + } + // If the worker was an actor, it'll be cleaned by GCS. if (actor_id.IsNil()) { Task task; static_cast(local_queues_.RemoveTask(task_id, &task)); } - - if (!intentional_disconnect) { + if (!intentional_exit) { // Push the error to driver. const JobID &job_id = worker->GetAssignedJobId(); - // TODO(rkn): Define this constant somewhere else. - std::string type = "worker_died"; - std::ostringstream error_message; - error_message << "A worker died or was killed while executing task " << task_id - << "."; - auto error_data_ptr = gcs::CreateErrorTableData(type, error_message.str(), - current_time_ms(), job_id); + const auto error_data_ptr = gcs::CreateErrorTableData( + type_str, error_message.str(), current_time_ms(), job_id); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } } @@ -1481,6 +1503,15 @@ void NodeManager::ProcessDisconnectClientMessage( // these can be leaked. } +void NodeManager::ProcessDisconnectClientMessage( + const std::shared_ptr &client, bool intentional_disconnect) { + if (intentional_disconnect) { + DisconnectClient(client, rpc::ClientDisconnectType::FINISHED); + return; + } + DisconnectClient(client); +} + void NodeManager::ProcessFetchOrReconstructMessage( const std::shared_ptr &client, const uint8_t *message_data) { auto message = flatbuffers::GetRoot(message_data); @@ -1569,7 +1600,7 @@ void NodeManager::ProcessWaitRequestMessage( } } else { // We failed to write to the client, so disconnect the client. - ProcessDisconnectClientMessage(client); + DisconnectClient(client, rpc::ClientDisconnectType::UNEXPECTED_EXIT); } }); RAY_CHECK_OK(status); @@ -1618,7 +1649,8 @@ void NodeManager::ProcessPushErrorRequestMessage(const uint8_t *message_data) { auto const &error_message = string_from_flatbuf(*message->error_message()); double timestamp = message->timestamp(); JobID job_id = from_flatbuf(*message->job_id()); - auto error_data_ptr = gcs::CreateErrorTableData(type, error_message, timestamp, job_id); + const auto error_data_ptr = + gcs::CreateErrorTableData(type, error_message, timestamp, job_id); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } @@ -1789,9 +1821,9 @@ void NodeManager::HandleCancelResourceReserve( << bundle_spec.DebugString(); // Kill all workers that are currently associated with the placement group. - // NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will - // delete the element of `leased_workers_`. So we need to filter out - // `workers_associated_with_pg` separately. + // NOTE: We can't traverse directly with `leased_workers_`, because + // `DisconnectAndKillWorker` will delete the element of `leased_workers_`. So we need to + // filter out `workers_associated_with_pg` separately. std::vector> workers_associated_with_pg; for (const auto &worker_it : leased_workers_) { auto &worker = worker_it.second; @@ -1807,7 +1839,7 @@ void NodeManager::HandleCancelResourceReserve( << ", task id: " << worker->GetAssignedTaskId() << ", actor id: " << worker->GetActorId() << ", worker id: " << worker->WorkerId(); - DestroyWorker(worker); + DisconnectAndKillWorker(worker, rpc::ClientDisconnectType::PLACEGROUP_REMOVED); } // Return bundle resources. @@ -1838,7 +1870,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, if (worker) { if (request.disconnect_worker()) { - ProcessDisconnectClientMessage(worker->Connection()); + DisconnectClient(worker->Connection(), rpc::ClientDisconnectType::UNEXPECTED_EXIT); } else { // Handle the edge case where the worker was returned before we got the // unblock RPC by unblocking it immediately (unblock is idempotent). @@ -2683,7 +2715,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr &worke } else { RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client"; // We failed to send the task to the worker, so disconnect the worker. - ProcessDisconnectClientMessage(worker->Connection()); + DisconnectClient(worker->Connection()); // Queue this task for future assignment. We need to do this since // DispatchTasks() removed it from the ready queue. The task will be // assigned to a worker once one becomes available. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 63f741af0064..3f947c9c187e 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -384,12 +384,23 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \return Void. void KillWorker(std::shared_ptr worker); - /// Destroy a worker. + /// Disconnect and kill a worker. /// We will disconnect the worker connection first and then kill the worker. /// - /// \param worker The worker to destroy. + /// \param worker The worker to stop. /// \return Void. - void DestroyWorker(std::shared_ptr worker); + void DisconnectAndKillWorker(std::shared_ptr worker, + rpc::ClientDisconnectType disconnect_type = + rpc::ClientDisconnectType::UNEXPECTED_EXIT); + + /// Disconnect a client. + /// + /// \param client The client that sent the message. + /// \param disconnect_type The reason to disconnect the specified client. + /// \return Void. + void DisconnectClient(const std::shared_ptr &client, + rpc::ClientDisconnectType disconnect_type = + rpc::ClientDisconnectType::UNEXPECTED_EXIT); /// When a job finished, loop over all of the queued tasks for that job and /// treat them as failed. @@ -472,7 +483,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// client. /// /// \param client The client that sent the message. - /// \param intentional_disconnect Whether the client was intentionally disconnected. + /// \param message_data A pointer to the message data. /// \return Void. void ProcessDisconnectClientMessage(const std::shared_ptr &client, bool intentional_disconnect = false); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 93a568748e80..8bfead23206b 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -977,7 +977,7 @@ void WorkerPool::WarnAboutSize() { << "using nested tasks " << "(see https://github.com/ray-project/ray/issues/3644) for " << "some a discussion of workarounds."; - auto error_data_ptr = gcs::CreateErrorTableData( + const auto error_data_ptr = gcs::CreateErrorTableData( "worker_pool_large", warning_message.str(), current_time_ms()); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); } From 92bcf6324c49a9261a503f3c5d3bd5e70dd2f7ed Mon Sep 17 00:00:00 2001 From: khu Date: Thu, 31 Dec 2020 16:48:48 -0800 Subject: [PATCH 02/15] fix ci --- src/ray/raylet/node_manager.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index af6208001bf5..c61afa0fbcb7 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1415,7 +1415,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) { // If the worker was running a task or actor, clean up the task and push an // error to the driver, unless the worker is already dead. - ErrorType error_type = ErrorType::WORKER_DIED; // TODO(rkn): Define this constant somewhere else. std::string type_str; std::ostringstream error_message; @@ -1428,7 +1427,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie << "."; } break; case rpc::ClientDisconnectType::PLACEGROUP_REMOVED: { - error_type = ErrorType::PLACEMENT_GROUP_ERROR; type_str = "placement_group"; error_message << "A worker was killed while executing task " << task_id << " due to placement group removal" From 617b245b967e8b9ef02f35422bceefda2b49d116 Mon Sep 17 00:00:00 2001 From: Keqiu Hu Date: Sun, 3 Jan 2021 22:18:55 -0800 Subject: [PATCH 03/15] Update src/ray/protobuf/common.proto Co-authored-by: fangfengbin <869218239a@zju.edu.cn> --- src/ray/protobuf/common.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 4786eb549dbb..74e1b8e15419 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -454,5 +454,5 @@ enum ClientDisconnectType { // Worker exit due to resource bundle release. UNUSED_RESOURCE_RELEASED = 2; // Worker exit due to placement group removal. - PLACEGROUP_REMOVED = 3; + PLACEMENT_GROUP_REMOVED = 3; } From b72024c4b8e4f060f69b2f6ac7aee89c4904004b Mon Sep 17 00:00:00 2001 From: Keqiu Hu Date: Sun, 3 Jan 2021 22:19:01 -0800 Subject: [PATCH 04/15] Update src/ray/protobuf/common.proto Co-authored-by: fangfengbin <869218239a@zju.edu.cn> --- src/ray/protobuf/common.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 74e1b8e15419..42b0443009ae 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -133,7 +133,7 @@ enum ErrorType { TASK_CANCELLED = 5; // Inidicates that creating the GCS service failed to create the actor. ACTOR_CREATION_FAILED = 6; - // Indicates failure due to placement group + // Indicates failure due to placement group. PLACEMENT_GROUP_ERROR = 7; } From 25da0d2e8837e88c3b55345866b477e1c969fda2 Mon Sep 17 00:00:00 2001 From: Keqiu Hu Date: Sun, 3 Jan 2021 22:19:08 -0800 Subject: [PATCH 05/15] Update src/ray/raylet/node_manager.cc Co-authored-by: fangfengbin <869218239a@zju.edu.cn> --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c61afa0fbcb7..de1ce797b79c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1429,7 +1429,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie case rpc::ClientDisconnectType::PLACEGROUP_REMOVED: { type_str = "placement_group"; error_message << "A worker was killed while executing task " << task_id - << " due to placement group removal" + << " due to placement group removal." << "."; } break; default: From 64f38241cd83e630ee534554b8d0180983a61f6d Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 22:22:19 -0800 Subject: [PATCH 06/15] remove debug msg --- src/ray/raylet/node_manager.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index de1ce797b79c..3a2cc0ffbad4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -355,7 +355,6 @@ void NodeManager::DisconnectAndKillWorker(std::shared_ptr worke // before actual resources are returned. Subsequent disconnect request that comes // due to worker dead will be ignored. DisconnectClient(worker->Connection(), disconnect_type); - RAY_LOG(ERROR) << "DisconnectAndKillWorker" << disconnect_type; worker->MarkDead(); KillWorker(worker); } @@ -1360,7 +1359,6 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & void NodeManager::DisconnectClient(const std::shared_ptr &client, rpc::ClientDisconnectType disconnect_type) { - RAY_LOG(ERROR) << "DisconnectClient*****" << disconnect_type; std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); bool is_worker = false, is_driver = false; if (worker) { From 18d790d3ab8fdf5f24a2528f7e91aab6cba27425 Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 22:29:11 -0800 Subject: [PATCH 07/15] address comments --- src/ray/raylet/node_manager.cc | 10 +++++----- src/ray/raylet/node_manager.h | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3a2cc0ffbad4..cd979268278c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1419,17 +1419,17 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie bool intentional_exit = false; switch (disconnect_type) { - case rpc::ClientDisconnectType::UNEXPECTED_EXIT: { + case rpc::ClientDisconnectType::UNEXPECTED_EXIT: type_str = "worker_died"; error_message << "A worker died or was killed while executing task " << task_id << "."; - } break; - case rpc::ClientDisconnectType::PLACEGROUP_REMOVED: { + break; + case rpc::ClientDisconnectType::PLACEGROUP_REMOVED: type_str = "placement_group"; error_message << "A worker was killed while executing task " << task_id << " due to placement group removal." << "."; - } break; + break; default: intentional_exit = true; } @@ -1866,7 +1866,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, if (worker) { if (request.disconnect_worker()) { - DisconnectClient(worker->Connection(), rpc::ClientDisconnectType::UNEXPECTED_EXIT); + DisconnectClient(worker->Connection()); } else { // Handle the edge case where the worker was returned before we got the // unblock RPC by unblocking it immediately (unblock is idempotent). diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 3f947c9c187e..d5b2dddbe4ee 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -483,7 +483,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// client. /// /// \param client The client that sent the message. - /// \param message_data A pointer to the message data. + /// \param intentional_disconnect Whether the client was intentionally disconnected. /// \return Void. void ProcessDisconnectClientMessage(const std::shared_ptr &client, bool intentional_disconnect = false); From ad5fa9209ddc47775632424aae2109fcf28a9927 Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 22:32:51 -0800 Subject: [PATCH 08/15] add a missing change --- src/ray/raylet/node_manager.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index cd979268278c..27894b2d8b34 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1424,7 +1424,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie error_message << "A worker died or was killed while executing task " << task_id << "."; break; - case rpc::ClientDisconnectType::PLACEGROUP_REMOVED: + case rpc::ClientDisconnectType::PLACEMENT_GROUP_REMOVED: type_str = "placement_group"; error_message << "A worker was killed while executing task " << task_id << " due to placement group removal." @@ -1835,7 +1835,7 @@ void NodeManager::HandleCancelResourceReserve( << ", task id: " << worker->GetAssignedTaskId() << ", actor id: " << worker->GetActorId() << ", worker id: " << worker->WorkerId(); - DisconnectAndKillWorker(worker, rpc::ClientDisconnectType::PLACEGROUP_REMOVED); + DisconnectAndKillWorker(worker, rpc::ClientDisconnectType::PLACEMENT_GROUP_REMOVED); } // Return bundle resources. From 004682518eb1f4f49f12cedf0d0fd72d008d9282 Mon Sep 17 00:00:00 2001 From: Keqiu Hu Date: Sun, 3 Jan 2021 22:38:26 -0800 Subject: [PATCH 09/15] Update src/ray/raylet/node_manager.cc Co-authored-by: fangfengbin <869218239a@zju.edu.cn> --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 27894b2d8b34..b8eeb253f16b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1258,7 +1258,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( static_cast(protocol::MessageType::RegisterClientReply), fbb.GetSize(), fbb.GetBufferPointer(), [this, client](const ray::Status &status) { if (!status.ok()) { - DisconnectClient(client, rpc::ClientDisconnectType::UNEXPECTED_EXIT); + DisconnectClient(client); } }); }; From e5035c836bc05d88a41c2e5d1cab67eacc6fd48c Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 22:40:32 -0800 Subject: [PATCH 10/15] remove another default --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b8eeb253f16b..74a12ea54de6 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1596,7 +1596,7 @@ void NodeManager::ProcessWaitRequestMessage( } } else { // We failed to write to the client, so disconnect the client. - DisconnectClient(client, rpc::ClientDisconnectType::UNEXPECTED_EXIT); + DisconnectClient(client); } }); RAY_CHECK_OK(status); From 3a4f06548cc35138c9fe545f6d4e198da72f61d0 Mon Sep 17 00:00:00 2001 From: Keqiu Hu Date: Sun, 3 Jan 2021 22:48:50 -0800 Subject: [PATCH 11/15] Update src/ray/raylet/node_manager.cc Co-authored-by: fangfengbin <869218239a@zju.edu.cn> --- src/ray/raylet/node_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 74a12ea54de6..871df424d26c 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -350,7 +350,7 @@ void NodeManager::KillWorker(std::shared_ptr worker) { void NodeManager::DisconnectAndKillWorker(std::shared_ptr worker, rpc::ClientDisconnectType disconnect_type) { // Used to destroy a worker when its bundle resource is released (unused or - // placementgroup is deleted.) + // placement group is deleted). // We should disconnect the client first. Otherwise, we'll remove bundle resources // before actual resources are returned. Subsequent disconnect request that comes // due to worker dead will be ignored. From c5a65967b7e3c6fbf6d47fd7365e96c7acc922bc Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 23:01:25 -0800 Subject: [PATCH 12/15] address comments --- src/ray/raylet/node_manager.cc | 5 ++++- src/ray/raylet/node_manager.h | 1 + 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 871df424d26c..f25cbf8a4184 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1430,8 +1430,11 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie << " due to placement group removal." << "."; break; - default: + case rpc::ClientDisconnectType::FINISHED: + case rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED: intentional_exit = true; + default: + RAY_LOG(FATAL) << "Unknown client disconnect type " << disconnect_type; } // If the worker was an actor, it'll be cleaned by GCS. diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d5b2dddbe4ee..1d90aaf58b93 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -388,6 +388,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// We will disconnect the worker connection first and then kill the worker. /// /// \param worker The worker to stop. + /// \param disconnect_type The reason to disconnect the specified client. /// \return Void. void DisconnectAndKillWorker(std::shared_ptr worker, rpc::ClientDisconnectType disconnect_type = From fb30b803b0fe15e24642af40fae37e9d61fee3ce Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 23:03:28 -0800 Subject: [PATCH 13/15] add missing break --- src/ray/raylet/node_manager.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f25cbf8a4184..4c3a1cf30b16 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1433,6 +1433,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie case rpc::ClientDisconnectType::FINISHED: case rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED: intentional_exit = true; + break; default: RAY_LOG(FATAL) << "Unknown client disconnect type " << disconnect_type; } From 432c461407d97ee1f584adebd2697615b98f10cd Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 23:06:57 -0800 Subject: [PATCH 14/15] remove . --- src/ray/raylet/node_manager.cc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4c3a1cf30b16..087e4e57f2ec 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1427,9 +1427,7 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie case rpc::ClientDisconnectType::PLACEMENT_GROUP_REMOVED: type_str = "placement_group"; error_message << "A worker was killed while executing task " << task_id - << " due to placement group removal." - << "."; - break; + << " due to placement group removal." break; case rpc::ClientDisconnectType::FINISHED: case rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED: intentional_exit = true; From 4e4a3e75bad59144c35719b9165dd4290eeed4a2 Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 3 Jan 2021 23:16:51 -0800 Subject: [PATCH 15/15] fix typo --- src/ray/raylet/node_manager.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 087e4e57f2ec..6085706bb1a3 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1427,7 +1427,8 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie case rpc::ClientDisconnectType::PLACEMENT_GROUP_REMOVED: type_str = "placement_group"; error_message << "A worker was killed while executing task " << task_id - << " due to placement group removal." break; + << " due to placement group removal."; + break; case rpc::ClientDisconnectType::FINISHED: case rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED: intentional_exit = true;