From 222b728d0a047759b6a445c61fceba91d17051be Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 27 Apr 2023 14:55:33 -0700 Subject: [PATCH 01/20] fix --- python/ray/tests/test_advanced_9.py | 8 +++++--- src/ray/core_worker/core_worker.cc | 1 + src/ray/gcs/gcs_server/pubsub_handler.cc | 3 ++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index b61e5aac9216..c5ed1181e4ac 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -258,7 +258,7 @@ def ready(self): run_string_as_driver(script.format(address=call_ray_start_2, val=2)) -@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.") +@pytest.mark.skipif(sys.platform == "win32", reason="Only works on linux.") def test_gcs_connection_no_leak(ray_start_cluster): cluster = ray_start_cluster head_node = cluster.add_node() @@ -274,6 +274,7 @@ def get_gcs_num_of_connections(): p = psutil.Process(gcs_server_pid) print(">>", p.num_fds()) return p.num_fds() + # Wait for everything to be ready. import time @@ -291,14 +292,15 @@ def ready(self): num_of_actors = 10 actors = [A.remote() for _ in range(num_of_actors)] print(ray.get([t.ready.remote() for t in actors])) - + get_gcs_num_of_connections() + # Kill the actors del actors # Make sure the # of fds opened by the GCS dropped. # This assumes worker processes are not created after the actor worker # processes die. - wait_for_condition(lambda: get_gcs_num_of_connections() <= fds_without_workers) + wait_for_condition(lambda: get_gcs_num_of_connections() <= fds_without_workers, timeout=30) num_fds_after_workers_die = get_gcs_num_of_connections() n = cluster.add_node(wait=True) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 329fda454b12..7597ea97b5cb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -829,6 +829,7 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail) { RAY_LOG(WARNING) << "Force exit the process. " << " Details: " << detail; + std::this_thread::sleep_for(std::chrono::seconds(10)); Disconnect(exit_type, detail); KillChildProcs(); diff --git a/src/ray/gcs/gcs_server/pubsub_handler.cc b/src/ray/gcs/gcs_server/pubsub_handler.cc index cf34b4f1e8a6..8943ef7ed28c 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.cc +++ b/src/ray/gcs/gcs_server/pubsub_handler.cc @@ -104,7 +104,7 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch( if (sender_id.empty()) { sender_id = request.subscriber_id(); } - + RAY_LOG(INFO) << "Adding sender to subscribers: " << UniqueID::FromBinary(sender_id) << "-" << subscriber_id; auto iter = sender_to_subscribers_.find(sender_id); if (iter == sender_to_subscribers_.end()) { iter = sender_to_subscribers_.insert({sender_id, {}}).first; @@ -139,6 +139,7 @@ void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) { } for (auto &subscriber_id : iter->second) { gcs_publisher_->GetPublisher()->UnregisterSubscriber(subscriber_id); + RAY_LOG(INFO) << "Remove subscribe id: " << UniqueID::FromBinary(sender_id) << "-" << subscriber_id; } sender_to_subscribers_.erase(iter); } From 331bda5c4b1c7b6ed461132e743f5fecdeac696a Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 27 Apr 2023 14:56:11 -0700 Subject: [PATCH 02/20] format --- python/ray/tests/test_advanced_9.py | 7 ++++--- src/ray/gcs/gcs_server/pubsub_handler.cc | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index c5ed1181e4ac..c3fa05108844 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -274,7 +274,6 @@ def get_gcs_num_of_connections(): p = psutil.Process(gcs_server_pid) print(">>", p.num_fds()) return p.num_fds() - # Wait for everything to be ready. import time @@ -293,14 +292,16 @@ def ready(self): actors = [A.remote() for _ in range(num_of_actors)] print(ray.get([t.ready.remote() for t in actors])) get_gcs_num_of_connections() - + # Kill the actors del actors # Make sure the # of fds opened by the GCS dropped. # This assumes worker processes are not created after the actor worker # processes die. - wait_for_condition(lambda: get_gcs_num_of_connections() <= fds_without_workers, timeout=30) + wait_for_condition( + lambda: get_gcs_num_of_connections() <= fds_without_workers, timeout=30 + ) num_fds_after_workers_die = get_gcs_num_of_connections() n = cluster.add_node(wait=True) diff --git a/src/ray/gcs/gcs_server/pubsub_handler.cc b/src/ray/gcs/gcs_server/pubsub_handler.cc index 8943ef7ed28c..d08eeb272249 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.cc +++ b/src/ray/gcs/gcs_server/pubsub_handler.cc @@ -104,7 +104,8 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch( if (sender_id.empty()) { sender_id = request.subscriber_id(); } - RAY_LOG(INFO) << "Adding sender to subscribers: " << UniqueID::FromBinary(sender_id) << "-" << subscriber_id; + RAY_LOG(INFO) << "Adding sender to subscribers: " << UniqueID::FromBinary(sender_id) + << "-" << subscriber_id; auto iter = sender_to_subscribers_.find(sender_id); if (iter == sender_to_subscribers_.end()) { iter = sender_to_subscribers_.insert({sender_id, {}}).first; @@ -139,7 +140,8 @@ void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) { } for (auto &subscriber_id : iter->second) { gcs_publisher_->GetPublisher()->UnregisterSubscriber(subscriber_id); - RAY_LOG(INFO) << "Remove subscribe id: " << UniqueID::FromBinary(sender_id) << "-" << subscriber_id; + RAY_LOG(INFO) << "Remove subscribe id: " << UniqueID::FromBinary(sender_id) << "-" + << subscriber_id; } sender_to_subscribers_.erase(iter); } From 9b1282e7c5f78d3bded9cada2f6dc8f61f5afc21 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 27 Apr 2023 15:01:36 -0700 Subject: [PATCH 03/20] fix --- src/ray/core_worker/core_worker.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7597ea97b5cb..329fda454b12 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -829,7 +829,6 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail) { RAY_LOG(WARNING) << "Force exit the process. " << " Details: " << detail; - std::this_thread::sleep_for(std::chrono::seconds(10)); Disconnect(exit_type, detail); KillChildProcs(); From bf271388dd39f97a461e4805716d8f29b32fc6f4 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 29 Apr 2023 01:52:15 +0000 Subject: [PATCH 04/20] deflakey Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/core_worker/core_worker.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 329fda454b12..d5fafb3f781c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -784,8 +784,8 @@ void CoreWorker::Exit( detail = std::move(detail), creation_task_exception_pb_bytes]() { rpc::DrainServerCallExecutor(); - Disconnect(exit_type, detail, creation_task_exception_pb_bytes); KillChildProcs(); + Disconnect(exit_type, detail, creation_task_exception_pb_bytes); Shutdown(); }, "CoreWorker.Shutdown"); @@ -829,10 +829,10 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type, const std::string &detail) { RAY_LOG(WARNING) << "Force exit the process. " << " Details: " << detail; - Disconnect(exit_type, detail); - KillChildProcs(); + Disconnect(exit_type, detail); + // NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup. // `exit()` will destruct static objects in an incorrect order, which will lead to // core dumps. From afce57e39671b863419846660f1d2c69dd20c6eb Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 29 Apr 2023 01:53:32 +0000 Subject: [PATCH 05/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_advanced_9.py | 5 +---- src/ray/gcs/gcs_server/pubsub_handler.cc | 4 ---- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index c3fa05108844..5a9a0fdea784 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -291,7 +291,6 @@ def ready(self): num_of_actors = 10 actors = [A.remote() for _ in range(num_of_actors)] print(ray.get([t.ready.remote() for t in actors])) - get_gcs_num_of_connections() # Kill the actors del actors @@ -299,9 +298,7 @@ def ready(self): # Make sure the # of fds opened by the GCS dropped. # This assumes worker processes are not created after the actor worker # processes die. - wait_for_condition( - lambda: get_gcs_num_of_connections() <= fds_without_workers, timeout=30 - ) + wait_for_condition(lambda: get_gcs_num_of_connections() <= fds_without_workers) num_fds_after_workers_die = get_gcs_num_of_connections() n = cluster.add_node(wait=True) diff --git a/src/ray/gcs/gcs_server/pubsub_handler.cc b/src/ray/gcs/gcs_server/pubsub_handler.cc index d08eeb272249..cf1417b35220 100644 --- a/src/ray/gcs/gcs_server/pubsub_handler.cc +++ b/src/ray/gcs/gcs_server/pubsub_handler.cc @@ -104,8 +104,6 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch( if (sender_id.empty()) { sender_id = request.subscriber_id(); } - RAY_LOG(INFO) << "Adding sender to subscribers: " << UniqueID::FromBinary(sender_id) - << "-" << subscriber_id; auto iter = sender_to_subscribers_.find(sender_id); if (iter == sender_to_subscribers_.end()) { iter = sender_to_subscribers_.insert({sender_id, {}}).first; @@ -140,8 +138,6 @@ void InternalPubSubHandler::RemoveSubscriberFrom(const std::string &sender_id) { } for (auto &subscriber_id : iter->second) { gcs_publisher_->GetPublisher()->UnregisterSubscriber(subscriber_id); - RAY_LOG(INFO) << "Remove subscribe id: " << UniqueID::FromBinary(sender_id) << "-" - << subscriber_id; } sender_to_subscribers_.erase(iter); } From 03eb25227eaae2be771d9905f08bf6d2fd026e92 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 29 Apr 2023 02:52:28 +0000 Subject: [PATCH 06/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/common/client_connection.h | 7 +++++++ src/ray/raylet/node_manager.cc | 10 ++++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 89d30fbbcdbc..e112e7bb6c69 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -125,6 +125,13 @@ class ServerConnection : public std::enable_shared_from_this { std::string DebugString() const; + void AsyncWaitTerminated(std::function callback) { + socket_.async_wait(local_stream_socket::wait_type::wait_error, + [callback=std::move(callback)] (auto){ + callback(); + }); + } + protected: /// A private constructor for a server connection. ServerConnection(local_stream_socket &&socket); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4b0593ea8515..32ad14239f80 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1494,9 +1494,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie disconnect_detail, worker->GetProcess().GetId(), creation_task_exception); - RAY_CHECK_OK( - gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); - if (is_worker) { const ActorID &actor_id = worker->GetActorId(); const TaskID &task_id = worker->GetAssignedTaskId(); @@ -1571,9 +1568,10 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie local_task_manager_->ClearWorkerBacklog(worker->WorkerId()); cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId()); - - client->Close(); - + client->AsyncWaitTerminated([client, worker_failure_data_ptr, this]{ + RAY_CHECK_OK( + gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); + }); // TODO(rkn): Tell the object manager that this client has disconnected so // that it can clean up the wait requests for this client. Currently I think // these can be leaked. From 08c06225fc56d337433e88fc38da697f7ddc6729 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 29 Apr 2023 02:58:51 +0000 Subject: [PATCH 07/20] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/common/client_connection.h | 4 +--- src/ray/raylet/node_manager.cc | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index e112e7bb6c69..0eb9163ad8d8 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -127,9 +127,7 @@ class ServerConnection : public std::enable_shared_from_this { void AsyncWaitTerminated(std::function callback) { socket_.async_wait(local_stream_socket::wait_type::wait_error, - [callback=std::move(callback)] (auto){ - callback(); - }); + [callback = std::move(callback)](auto) { callback(); }); } protected: diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 32ad14239f80..c0af95cfbef3 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1568,9 +1568,9 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie local_task_manager_->ClearWorkerBacklog(worker->WorkerId()); cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId()); - client->AsyncWaitTerminated([client, worker_failure_data_ptr, this]{ - RAY_CHECK_OK( - gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); + client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] { + RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, + nullptr)); }); // TODO(rkn): Tell the object manager that this client has disconnected so // that it can clean up the wait requests for this client. Currently I think From ee7755d6f12dcbbed84a64b4f9c69cd2f1e6089e Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Sat, 29 Apr 2023 03:44:18 +0000 Subject: [PATCH 08/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_advanced_9.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tests/test_advanced_9.py b/python/ray/tests/test_advanced_9.py index 5a9a0fdea784..accddc1b3164 100644 --- a/python/ray/tests/test_advanced_9.py +++ b/python/ray/tests/test_advanced_9.py @@ -258,7 +258,7 @@ def ready(self): run_string_as_driver(script.format(address=call_ray_start_2, val=2)) -@pytest.mark.skipif(sys.platform == "win32", reason="Only works on linux.") +@pytest.mark.skipif(sys.platform == "win32", reason="Not valid on win32.") def test_gcs_connection_no_leak(ray_start_cluster): cluster = ray_start_cluster head_node = cluster.add_node() From a2245c92f3689f3063f1a538f55c44eee534506b Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Mon, 1 May 2023 21:21:26 +0000 Subject: [PATCH 09/20] fix comments Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/common/client_connection.h | 1 + src/ray/core_worker/core_worker.cc | 2 ++ src/ray/raylet/node_manager.cc | 5 +++++ 3 files changed, 8 insertions(+) diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 0eb9163ad8d8..9a86ffb808e8 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -126,6 +126,7 @@ class ServerConnection : public std::enable_shared_from_this { std::string DebugString() const; void AsyncWaitTerminated(std::function callback) { + // Async wait until the connection is disconnected. socket_.async_wait(local_stream_socket::wait_type::wait_error, [callback = std::move(callback)](auto) { callback(); }); } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index d5fafb3f781c..15675a5e84de 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -785,6 +785,7 @@ void CoreWorker::Exit( creation_task_exception_pb_bytes]() { rpc::DrainServerCallExecutor(); KillChildProcs(); + // Disconnect here before KillChildProcs to make the Raylet async wait shorter. Disconnect(exit_type, detail, creation_task_exception_pb_bytes); Shutdown(); }, @@ -831,6 +832,7 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type, << " Details: " << detail; KillChildProcs(); + // Disconnect here before KillChildProcs to make the Raylet async wait shorter. Disconnect(exit_type, detail); // NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c0af95cfbef3..6aa2ed5dc3e9 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1568,6 +1568,11 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie local_task_manager_->ClearWorkerBacklog(worker->WorkerId()); cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId()); + // ReportWorkerFailure should happen after the worker exit completely. + // A better way is to monitor the pid exit. But that needs Process.h + // support async operation. + // Here we monitor the socket to achieve similar result. + // When the worker exited, the pid will be disconnected (local stream socket). client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] { RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); From 2e16e2c5579024d77fd9bd7fdc871c127c892fdf Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 2 May 2023 18:23:22 +0000 Subject: [PATCH 10/20] fix comments Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/core_worker/core_worker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ddd716c94d58..34f1ba1fc6cb 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -786,7 +786,7 @@ void CoreWorker::Exit( creation_task_exception_pb_bytes]() { rpc::DrainServerCallExecutor(); KillChildProcs(); - // Disconnect here before KillChildProcs to make the Raylet async wait shorter. + // Disconnect here after KillChildProcs to make the Raylet async wait shorter. Disconnect(exit_type, detail, creation_task_exception_pb_bytes); Shutdown(); }, From 487e5a461f70ef21757e16e3acb2f2f9d5afe148 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 2 May 2023 16:38:23 -0700 Subject: [PATCH 11/20] fix --- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index ee328510ea82..09330437edee 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -776,14 +776,14 @@ void GcsActorManager::PollOwnerForActorOutOfScope( [this, owner_node_id, owner_id, actor_id]( Status status, const rpc::WaitForActorOutOfScopeReply &reply) { if (!status.ok()) { - RAY_LOG(INFO) << "Worker " << owner_id - << " failed, destroying actor child, job id = " - << actor_id.JobId(); - } else { - RAY_LOG(INFO) << "Actor " << actor_id - << " is out of scope, destroying actor, job id = " - << actor_id.JobId(); + RAY_LOG(WARMING) << "Failed to wait for actor " << actor_id + << " out of scope, job id = " << actor_id.JobId() + << ", error: " << status.ToString(); + return; } + RAY_LOG(INFO) << "Actor " << actor_id + << " is out of scope, destroying actor, job id = " + << actor_id.JobId(); auto node_it = owners_.find(owner_node_id); if (node_it != owners_.end() && node_it->second.count(owner_id)) { From f7a7a8aef270f8909393eccec43dec2f60ad823f Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Tue, 2 May 2023 16:39:07 -0700 Subject: [PATCH 12/20] fix --- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 09330437edee..ff33ce863ada 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -779,6 +779,7 @@ void GcsActorManager::PollOwnerForActorOutOfScope( RAY_LOG(WARMING) << "Failed to wait for actor " << actor_id << " out of scope, job id = " << actor_id.JobId() << ", error: " << status.ToString(); + // TODO(iycheng): Retry it in other PR. return; } RAY_LOG(INFO) << "Actor " << actor_id From eb7738e52ae6f5455e05e07bacb679160e41b988 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 3 May 2023 00:47:42 +0000 Subject: [PATCH 13/20] fix typo Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index ff33ce863ada..0536f859bea8 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -776,7 +776,7 @@ void GcsActorManager::PollOwnerForActorOutOfScope( [this, owner_node_id, owner_id, actor_id]( Status status, const rpc::WaitForActorOutOfScopeReply &reply) { if (!status.ok()) { - RAY_LOG(WARMING) << "Failed to wait for actor " << actor_id + RAY_LOG(WARNING) << "Failed to wait for actor " << actor_id << " out of scope, job id = " << actor_id.JobId() << ", error: " << status.ToString(); // TODO(iycheng): Retry it in other PR. From 627688b16bebc575237c6c70187eb41ae7751d47 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 3 May 2023 06:40:31 +0000 Subject: [PATCH 14/20] delete invalid test Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- .../gcs/gcs_client/test/gcs_client_test.cc | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index d3baeeb964d0..10325d448f5e 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -947,46 +947,6 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) { << actor_count << " actors."; } -TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) { - // Restart doesn't work with in memory storage - if (RayConfig::instance().gcs_storage() == "memory") { - return; - } - // Register actors and the actors will be destroyed. - JobID job_id = JobID::FromInt(1); - AddJob(job_id); - absl::flat_hash_set actor_ids; - int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count(); - for (int index = 0; index < actor_count; ++index) { - auto actor_table_data = Mocker::GenActorTableData(job_id); - RegisterActor(actor_table_data, false); - actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id())); - } - - // Restart GCS. - RestartGcsServer(); - - for (int index = 0; index < actor_count; ++index) { - auto actor_table_data = Mocker::GenActorTableData(job_id); - RegisterActor(actor_table_data, false); - actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id())); - } - - // NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs - // client will register the actor again and the status of the actor may be - // `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors. - auto condition = [this]() { - return GetAllActors(true).size() == - RayConfig::instance().maximum_gcs_destroyed_actor_cached_count(); - }; - EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); - - auto actors = GetAllActors(true); - for (const auto &actor : actors) { - EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id()))); - } -} - TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) { // Restart GCS. RestartGcsServer(); From e431de621d73f059e5f8991781271cce41df0382 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 3 May 2023 19:05:39 +0000 Subject: [PATCH 15/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_actor_failures.py | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 7e82a4975156..cc560fe30bfc 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -790,6 +790,33 @@ def foo(): ray.get(ref) +def test_actor_out_of_scope_failure(ray_start_regular): + @ray.remote(max_restarts=-1) + class Actor: + def ping(self): + return "hello" + + @ray.remote + class Parent: + def generate_actors(self): + self.child = Actor.remote() + self.detached_actor = Actor.options( + name="actor", lifetime="detached" + ).remote() + return self.child, self.detached_actor, os.getpid() + + parent = Parent.remote() + actor, detached_actor, pid = ray.get(parent.generate_actors.remote()) + + os.kill(pid, signal.SIGKILL) + + with pytest.raises( + ray.exceptions.RayActorError, + match=".*The actor is dead because all references to the actor were removed.*", + ): + print("actor.ping:", ray.get(actor.ping.remote())) + + if __name__ == "__main__": import pytest From 669d5e9371fd5946e3948b2b2f562a620e19aec0 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Wed, 3 May 2023 20:24:06 +0000 Subject: [PATCH 16/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/serve/tests/test_controller_recovery.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/tests/test_controller_recovery.py b/python/ray/serve/tests/test_controller_recovery.py index 77d262c26ee2..e3471c9b3ba5 100644 --- a/python/ray/serve/tests/test_controller_recovery.py +++ b/python/ray/serve/tests/test_controller_recovery.py @@ -238,8 +238,10 @@ def get_actor_info(name: str): _, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME) ray.kill(serve.context._global_client._controller, no_restart=False) # wait for controller is alive again - wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME) - assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1] + wait_for_condition( + lambda: get_actor_info(SERVE_CONTROLLER_NAME) is not None + and get_actor_info(SERVE_CONTROLLER_NAME)[1] != controller1_pid + ) # Let the actor proceed initialization ray.get(signal.send.remote()) From c7b0cefb71a5fba2be06c185f3058de1d4d7471b Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 4 May 2023 02:49:17 +0000 Subject: [PATCH 17/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_actor_failures.py | 27 ------------------------- 1 file changed, 27 deletions(-) diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index cc560fe30bfc..7e82a4975156 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -790,33 +790,6 @@ def foo(): ray.get(ref) -def test_actor_out_of_scope_failure(ray_start_regular): - @ray.remote(max_restarts=-1) - class Actor: - def ping(self): - return "hello" - - @ray.remote - class Parent: - def generate_actors(self): - self.child = Actor.remote() - self.detached_actor = Actor.options( - name="actor", lifetime="detached" - ).remote() - return self.child, self.detached_actor, os.getpid() - - parent = Parent.remote() - actor, detached_actor, pid = ray.get(parent.generate_actors.remote()) - - os.kill(pid, signal.SIGKILL) - - with pytest.raises( - ray.exceptions.RayActorError, - match=".*The actor is dead because all references to the actor were removed.*", - ): - print("actor.ping:", ray.get(actor.ping.remote())) - - if __name__ == "__main__": import pytest From 2c6ab6445235c7991d39faa9085967daee7e1a53 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 4 May 2023 08:25:08 +0000 Subject: [PATCH 18/20] fix Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- python/ray/tests/test_failure_3.py | 13 ++++++++++--- src/ray/gcs/gcs_client/accessor.cc | 4 +++- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 4 +++- src/ray/gcs/pb_util.h | 2 ++ src/ray/raylet/node_manager.cc | 19 ++++++++++--------- 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/python/ray/tests/test_failure_3.py b/python/ray/tests/test_failure_3.py index 926b7c76ec01..b666a9157e72 100644 --- a/python/ray/tests/test_failure_3.py +++ b/python/ray/tests/test_failure_3.py @@ -365,6 +365,8 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path): the list of PIDs that are children of the Ray worker processes. """ + ray_start_cluster.add_node() + ray_start_cluster.wait_for_nodes() output_file_path = tmp_path / "leaked_pids.json" driver_script = f""" @@ -374,7 +376,7 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path): import shutil import time import os - +ray.init("{ray_start_cluster.address}") @ray.remote class Actor: def create_leaked_child_process(self, num_to_leak): @@ -424,7 +426,6 @@ def task(): print(os.getpid()) time.sleep(1) """ - driver_proc = run_string_as_driver_nonblocking(driver_script) # Wait for the json file containing the child PIDS @@ -443,9 +444,15 @@ def task(): assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes]) # Valdiate children of worker process die after SIGINT. + def check(): + for proc in processes: + if proc.is_running(): + print(proc) + return all([not proc.is_running() for proc in processes]) + driver_proc.send_signal(signal.SIGINT) wait_for_condition( - condition_predictor=lambda: all([not proc.is_running() for proc in processes]), + condition_predictor=check, timeout=30, ) diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index ccb225a62931..358b3940e6dc 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -852,7 +852,9 @@ Status WorkerInfoAccessor::AsyncReportWorkerFailure( const std::shared_ptr &data_ptr, const StatusCallback &callback) { rpc::Address worker_address = data_ptr->worker_address(); - RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString(); + RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString() + << " WorkerID=" << WorkerID::FromBinary(worker_address.worker_id()) + << " NodeID=" << NodeID::FromBinary(worker_address.raylet_id()); rpc::ReportWorkerFailureRequest request; request.mutable_worker_failure()->CopyFrom(*data_ptr); client_impl_->GetGcsRpcClient().ReportWorkerFailure( diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 0536f859bea8..e6a347ddebe1 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -761,7 +761,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope( auto it = workers.find(owner_id); if (it == workers.end()) { RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id - << ", job id = " << actor_id.JobId(); + << ", job id = " << actor_id.JobId() + << " owner node id = " << owner_node_id; std::shared_ptr client = worker_client_factory_(actor->GetOwnerAddress()); it = workers.emplace(owner_id, Owner(std::move(client))).first; @@ -958,6 +959,7 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id, bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT && disconnect_type != rpc::WorkerExitType::USER_ERROR; + // Destroy all actors that are owned by this worker. const auto it = owners_.find(node_id); if (it != owners_.end() && it->second.count(worker_id)) { diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index 7f99aa35924d..7aa91e6538da 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -107,6 +107,7 @@ inline std::shared_ptr CreateActorTableData( /// Helper function to produce worker failure data. inline std::shared_ptr CreateWorkerFailureData( + const NodeID &node_id, const WorkerID &worker_id, int64_t timestamp, rpc::WorkerExitType disconnect_type, @@ -117,6 +118,7 @@ inline std::shared_ptr CreateWorkerFailureData( // Only report the worker id + delta (new data upon worker failures). // GCS will merge the data with original worker data. worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary()); + worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(node_id.Binary()); worker_failure_info_ptr->set_timestamp(timestamp); worker_failure_info_ptr->set_exit_type(disconnect_type); worker_failure_info_ptr->set_exit_detail(disconnect_detail); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 32d51decc5ed..fc3fe1074054 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1474,6 +1474,16 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie disconnect_detail, worker->GetProcess().GetId(), creation_task_exception); + // ReportWorkerFailure should happen after the worker exit completely. + // A better way is to monitor the pid exit. But that needs Process.h + // support async operation. + // Here we monitor the socket to achieve similar result. + // When the worker exited, the pid will be disconnected (local stream socket). + client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] { + RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, + nullptr)); + }); + if (is_worker) { const ActorID &actor_id = worker->GetActorId(); const TaskID &task_id = worker->GetAssignedTaskId(); @@ -1548,15 +1558,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie local_task_manager_->ClearWorkerBacklog(worker->WorkerId()); cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId()); - // ReportWorkerFailure should happen after the worker exit completely. - // A better way is to monitor the pid exit. But that needs Process.h - // support async operation. - // Here we monitor the socket to achieve similar result. - // When the worker exited, the pid will be disconnected (local stream socket). - client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] { - RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, - nullptr)); - }); // TODO(rkn): Tell the object manager that this client has disconnected so // that it can clean up the wait requests for this client. Currently I think // these can be leaked. From 5afe5946a1b09f49a349d107c8dcc7be23b0c9eb Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 4 May 2023 16:04:46 +0000 Subject: [PATCH 19/20] format Signed-off-by: Yi Cheng <74173148+iycheng@users.noreply.github.com> --- src/ray/raylet/node_manager.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fc3fe1074054..c0aacf08e66b 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1468,7 +1468,8 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie } // Publish the worker failure. auto worker_failure_data_ptr = - gcs::CreateWorkerFailureData(worker->WorkerId(), + gcs::CreateWorkerFailureData(self_node_id_, + worker->WorkerId(), time(nullptr), disconnect_type, disconnect_detail, From 31b06cb36c98f8049d66f068ad200d350e9e1203 Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 4 May 2023 18:35:07 -0700 Subject: [PATCH 20/20] up --- src/ray/raylet/node_manager.cc | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index c0aacf08e66b..abd0aac120f4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1475,16 +1475,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie disconnect_detail, worker->GetProcess().GetId(), creation_task_exception); - // ReportWorkerFailure should happen after the worker exit completely. - // A better way is to monitor the pid exit. But that needs Process.h - // support async operation. - // Here we monitor the socket to achieve similar result. - // When the worker exited, the pid will be disconnected (local stream socket). - client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] { - RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, - nullptr)); - }); - if (is_worker) { const ActorID &actor_id = worker->GetActorId(); const TaskID &task_id = worker->GetAssignedTaskId(); @@ -1559,6 +1549,23 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie local_task_manager_->ClearWorkerBacklog(worker->WorkerId()); cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId()); +#ifdef _WIN32 + // On Windows, when the worker is killed, client async wait won't get notified + // somehow. + RAY_CHECK_OK( + gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr)); + client->Close(); +#else + // ReportWorkerFailure should happen after the worker exit completely. + // A better way is to monitor the pid exit. But that needs Process.h + // support async operation. + // Here we monitor the socket to achieve similar result. + // When the worker exited, the pid will be disconnected (local stream socket). + client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] { + RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, + nullptr)); + }); +#endif // TODO(rkn): Tell the object manager that this client has disconnected so // that it can clean up the wait requests for this client. Currently I think // these can be leaked.