Skip to content

Commit

Permalink
Deflakey test advanced 9 (ray-project#34883)
Browse files Browse the repository at this point in the history
Previously a bug was fixed in [PR](ray-project#33311) where pubsub causes the leak. Somehow the fix has race conditions and got triggered later when code changes.

The test is flakey because there is a race condition between raylet sending node failure and core worker exit itself.

When disconnect is sent to Raylet, Raylet will start to report worker failure. But the worker still continue to run.

GCS uses worker failure to close the connection. But if the worker is still alive, the worker might send another request the GCS which will lead to the FD leak.

This PR did two improvements:
- Move the heavy workload before sending disconnect request
- Raylet will report worker failure if the socket is closed.
  • Loading branch information
fishbone authored May 5, 2023
1 parent f6d2c8a commit d60e73e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 65 deletions.
6 changes: 4 additions & 2 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ def get_actor_info(name: str):
_, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME)
ray.kill(serve.context._global_client._controller, no_restart=False)
# wait for controller is alive again
wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME)
assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1]
wait_for_condition(
lambda: get_actor_info(SERVE_CONTROLLER_NAME) is not None
and get_actor_info(SERVE_CONTROLLER_NAME)[1] != controller1_pid
)

# Let the actor proceed initialization
ray.get(signal.send.remote())
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def ready(self):
run_string_as_driver(script.format(address=call_ray_start_2, val=2))


@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
@pytest.mark.skipif(sys.platform == "win32", reason="Not valid on win32.")
def test_gcs_connection_no_leak(ray_start_cluster):
cluster = ray_start_cluster
head_node = cluster.add_node()
Expand Down
13 changes: 10 additions & 3 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
the list of PIDs that are children of the Ray worker
processes.
"""
ray_start_cluster.add_node()
ray_start_cluster.wait_for_nodes()

output_file_path = tmp_path / "leaked_pids.json"
driver_script = f"""
Expand All @@ -374,7 +376,7 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
import shutil
import time
import os
ray.init("{ray_start_cluster.address}")
@ray.remote
class Actor:
def create_leaked_child_process(self, num_to_leak):
Expand Down Expand Up @@ -424,7 +426,6 @@ def task():
print(os.getpid())
time.sleep(1)
"""

driver_proc = run_string_as_driver_nonblocking(driver_script)

# Wait for the json file containing the child PIDS
Expand All @@ -443,9 +444,15 @@ def task():
assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes])

# Valdiate children of worker process die after SIGINT.
def check():
for proc in processes:
if proc.is_running():
print(proc)
return all([not proc.is_running() for proc in processes])

driver_proc.send_signal(signal.SIGINT)
wait_for_condition(
condition_predictor=lambda: all([not proc.is_running() for proc in processes]),
condition_predictor=check,
timeout=30,
)

Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {

std::string DebugString() const;

void AsyncWaitTerminated(std::function<void()> callback) {
// Async wait until the connection is disconnected.
socket_.async_wait(local_stream_socket::wait_type::wait_error,
[callback = std::move(callback)](auto) { callback(); });
}

protected:
/// A private constructor for a server connection.
ServerConnection(local_stream_socket &&socket);
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,9 @@ void CoreWorker::Exit(
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainServerCallExecutor();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
// Disconnect here after KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
Shutdown();
},
"CoreWorker.Shutdown");
Expand Down Expand Up @@ -830,10 +831,11 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
const std::string &detail) {
RAY_LOG(WARNING) << "Force exit the process. "
<< " Details: " << detail;
Disconnect(exit_type, detail);

KillChildProcs();

// Disconnect here before KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail);

// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,9 @@ Status WorkerInfoAccessor::AsyncReportWorkerFailure(
const std::shared_ptr<rpc::WorkerTableData> &data_ptr,
const StatusCallback &callback) {
rpc::Address worker_address = data_ptr->worker_address();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString()
<< " WorkerID=" << WorkerID::FromBinary(worker_address.worker_id())
<< " NodeID=" << NodeID::FromBinary(worker_address.raylet_id());
rpc::ReportWorkerFailureRequest request;
request.mutable_worker_failure()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportWorkerFailure(
Expand Down
40 changes: 0 additions & 40 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,46 +947,6 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) {
<< actor_count << " actors.";
}

TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) {
// Restart doesn't work with in memory storage
if (RayConfig::instance().gcs_storage() == "memory") {
return;
}
// Register actors and the actors will be destroyed.
JobID job_id = JobID::FromInt(1);
AddJob(job_id);
absl::flat_hash_set<ActorID> actor_ids;
int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// Restart GCS.
RestartGcsServer();

for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again and the status of the actor may be
// `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors.
auto condition = [this]() {
return GetAllActors(true).size() ==
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));

auto actors = GetAllActors(true);
for (const auto &actor : actors) {
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
}
}

TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) {
// Restart GCS.
RestartGcsServer();
Expand Down
19 changes: 11 additions & 8 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
auto it = workers.find(owner_id);
if (it == workers.end()) {
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id
<< ", job id = " << actor_id.JobId();
<< ", job id = " << actor_id.JobId()
<< " owner node id = " << owner_node_id;
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
it = workers.emplace(owner_id, Owner(std::move(client))).first;
Expand All @@ -776,14 +777,15 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
[this, owner_node_id, owner_id, actor_id](
Status status, const rpc::WaitForActorOutOfScopeReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Worker " << owner_id
<< " failed, destroying actor child, job id = "
<< actor_id.JobId();
} else {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();
RAY_LOG(WARNING) << "Failed to wait for actor " << actor_id
<< " out of scope, job id = " << actor_id.JobId()
<< ", error: " << status.ToString();
// TODO(iycheng): Retry it in other PR.
return;
}
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();

auto node_it = owners_.find(owner_node_id);
if (node_it != owners_.end() && node_it->second.count(owner_id)) {
Expand Down Expand Up @@ -957,6 +959,7 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,

bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT &&
disconnect_type != rpc::WorkerExitType::USER_ERROR;

// Destroy all actors that are owned by this worker.
const auto it = owners_.find(node_id);
if (it != owners_.end() && it->second.count(worker_id)) {
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/pubsub_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch(
if (sender_id.empty()) {
sender_id = request.subscriber_id();
}

auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
iter = sender_to_subscribers_.insert({sender_id, {}}).first;
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(

/// Helper function to produce worker failure data.
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &node_id,
const WorkerID &worker_id,
int64_t timestamp,
rpc::WorkerExitType disconnect_type,
Expand All @@ -117,6 +118,7 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
// Only report the worker id + delta (new data upon worker failures).
// GCS will merge the data with original worker data.
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(node_id.Binary());
worker_failure_info_ptr->set_timestamp(timestamp);
worker_failure_info_ptr->set_exit_type(disconnect_type);
worker_failure_info_ptr->set_exit_detail(disconnect_detail);
Expand Down
24 changes: 18 additions & 6 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1480,15 +1480,13 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
}
// Publish the worker failure.
auto worker_failure_data_ptr =
gcs::CreateWorkerFailureData(worker->WorkerId(),
gcs::CreateWorkerFailureData(self_node_id_,
worker->WorkerId(),
time(nullptr),
disconnect_type,
disconnect_detail,
worker->GetProcess().GetId(),
creation_task_exception);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
Expand Down Expand Up @@ -1563,9 +1561,23 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie

local_task_manager_->ClearWorkerBacklog(worker->WorkerId());
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());

#ifdef _WIN32
// On Windows, when the worker is killed, client async wait won't get notified
// somehow.
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));
client->Close();

#else
// ReportWorkerFailure should happen after the worker exit completely.
// A better way is to monitor the pid exit. But that needs Process.h
// support async operation.
// Here we monitor the socket to achieve similar result.
// When the worker exited, the pid will be disconnected (local stream socket).
client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] {
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
});
#endif
// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down

0 comments on commit d60e73e

Please sign in to comment.