Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deflakey test advanced 9 #34883

Merged
merged 22 commits into from
May 5, 2023
2 changes: 1 addition & 1 deletion python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def ready(self):
run_string_as_driver(script.format(address=call_ray_start_2, val=2))


@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
@pytest.mark.skipif(sys.platform == "win32", reason="Not valid on win32.")
def test_gcs_connection_no_leak(ray_start_cluster):
cluster = ray_start_cluster
head_node = cluster.add_node()
Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {

std::string DebugString() const;

void AsyncWaitTerminated(std::function<void()> callback) {
socket_.async_wait(local_stream_socket::wait_type::wait_error,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment? Is this waiting for the socket to be cleaned up (TIME_WAIT)?

Copy link
Contributor Author

@fishbone fishbone May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to wait until a connection terminated.

Ideally, the fix should monitor the pid. But this needs more updates. So instead, we monitor the tcp.
I'll add comment for this.

[callback = std::move(callback)](auto) { callback(); });
}

protected:
/// A private constructor for a server connection.
ServerConnection(local_stream_socket &&socket);
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,8 @@ void CoreWorker::Exit(
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainServerCallExecutor();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why does this order matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter since raylet has code handling failure. This is only to make the async_wait shorter. Not very important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment to this effect?

Shutdown();
},
"CoreWorker.Shutdown");
Expand Down Expand Up @@ -829,10 +829,10 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
const std::string &detail) {
RAY_LOG(WARNING) << "Force exit the process. "
<< " Details: " << detail;
Disconnect(exit_type, detail);

KillChildProcs();

Disconnect(exit_type, detail);

// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/pubsub_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch(
if (sender_id.empty()) {
sender_id = request.subscriber_id();
}

auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
iter = sender_to_subscribers_.insert({sender_id, {}}).first;
Expand Down
10 changes: 4 additions & 6 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1494,9 +1494,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
disconnect_detail,
worker->GetProcess().GetId(),
creation_task_exception);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
Expand Down Expand Up @@ -1571,9 +1568,10 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie

local_task_manager_->ClearWorkerBacklog(worker->WorkerId());
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());

client->Close();

client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] {
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
});
// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down