Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deflakey test advanced 9 #34883

Merged
merged 22 commits into from
May 5, 2023
6 changes: 4 additions & 2 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,10 @@ def get_actor_info(name: str):
_, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME)
ray.kill(serve.context._global_client._controller, no_restart=False)
# wait for controller is alive again
wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME)
assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1]
wait_for_condition(
lambda: get_actor_info(SERVE_CONTROLLER_NAME) is not None
and get_actor_info(SERVE_CONTROLLER_NAME)[1] != controller1_pid
)

# Let the actor proceed initialization
ray.get(signal.send.remote())
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def ready(self):
run_string_as_driver(script.format(address=call_ray_start_2, val=2))


@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
@pytest.mark.skipif(sys.platform == "win32", reason="Not valid on win32.")
def test_gcs_connection_no_leak(ray_start_cluster):
cluster = ray_start_cluster
head_node = cluster.add_node()
Expand Down
13 changes: 10 additions & 3 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
the list of PIDs that are children of the Ray worker
processes.
"""
ray_start_cluster.add_node()
ray_start_cluster.wait_for_nodes()

output_file_path = tmp_path / "leaked_pids.json"
driver_script = f"""
Expand All @@ -374,7 +376,7 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
import shutil
import time
import os

ray.init("{ray_start_cluster.address}")
@ray.remote
class Actor:
def create_leaked_child_process(self, num_to_leak):
Expand Down Expand Up @@ -424,7 +426,6 @@ def task():
print(os.getpid())
time.sleep(1)
"""

driver_proc = run_string_as_driver_nonblocking(driver_script)

# Wait for the json file containing the child PIDS
Expand All @@ -443,9 +444,15 @@ def task():
assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes])

# Valdiate children of worker process die after SIGINT.
def check():
for proc in processes:
if proc.is_running():
print(proc)
return all([not proc.is_running() for proc in processes])

driver_proc.send_signal(signal.SIGINT)
wait_for_condition(
condition_predictor=lambda: all([not proc.is_running() for proc in processes]),
condition_predictor=check,
timeout=30,
)

Expand Down
6 changes: 6 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {

std::string DebugString() const;

void AsyncWaitTerminated(std::function<void()> callback) {
// Async wait until the connection is disconnected.
socket_.async_wait(local_stream_socket::wait_type::wait_error,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add comment? Is this waiting for the socket to be cleaned up (TIME_WAIT)?

Copy link
Contributor Author

@fishbone fishbone May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to wait until a connection terminated.

Ideally, the fix should monitor the pid. But this needs more updates. So instead, we monitor the tcp.
I'll add comment for this.

[callback = std::move(callback)](auto) { callback(); });
}

protected:
/// A private constructor for a server connection.
ServerConnection(local_stream_socket &&socket);
Expand Down
8 changes: 5 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,9 @@ void CoreWorker::Exit(
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainServerCallExecutor();
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
// Disconnect here after KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why does this order matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter since raylet has code handling failure. This is only to make the async_wait shorter. Not very important.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment to this effect?

Shutdown();
},
"CoreWorker.Shutdown");
Expand Down Expand Up @@ -830,10 +831,11 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
const std::string &detail) {
RAY_LOG(WARNING) << "Force exit the process. "
<< " Details: " << detail;
Disconnect(exit_type, detail);

KillChildProcs();

// Disconnect here before KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail);

// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,9 @@ Status WorkerInfoAccessor::AsyncReportWorkerFailure(
const std::shared_ptr<rpc::WorkerTableData> &data_ptr,
const StatusCallback &callback) {
rpc::Address worker_address = data_ptr->worker_address();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString()
<< " WorkerID=" << WorkerID::FromBinary(worker_address.worker_id())
<< " NodeID=" << NodeID::FromBinary(worker_address.raylet_id());
rpc::ReportWorkerFailureRequest request;
request.mutable_worker_failure()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportWorkerFailure(
Expand Down
40 changes: 0 additions & 40 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,46 +947,6 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) {
<< actor_count << " actors.";
}

TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) {
// Restart doesn't work with in memory storage
if (RayConfig::instance().gcs_storage() == "memory") {
return;
}
// Register actors and the actors will be destroyed.
JobID job_id = JobID::FromInt(1);
AddJob(job_id);
absl::flat_hash_set<ActorID> actor_ids;
int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// Restart GCS.
RestartGcsServer();

for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again and the status of the actor may be
// `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors.
auto condition = [this]() {
return GetAllActors(true).size() ==
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));

auto actors = GetAllActors(true);
for (const auto &actor : actors) {
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
}
}

TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) {
// Restart GCS.
RestartGcsServer();
Expand Down
19 changes: 11 additions & 8 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
auto it = workers.find(owner_id);
if (it == workers.end()) {
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id
<< ", job id = " << actor_id.JobId();
<< ", job id = " << actor_id.JobId()
<< " owner node id = " << owner_node_id;
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
it = workers.emplace(owner_id, Owner(std::move(client))).first;
Expand All @@ -776,14 +777,15 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
[this, owner_node_id, owner_id, actor_id](
Status status, const rpc::WaitForActorOutOfScopeReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Worker " << owner_id
<< " failed, destroying actor child, job id = "
<< actor_id.JobId();
} else {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();
RAY_LOG(WARNING) << "Failed to wait for actor " << actor_id
<< " out of scope, job id = " << actor_id.JobId()
<< ", error: " << status.ToString();
// TODO(iycheng): Retry it in other PR.
return;
}
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();

auto node_it = owners_.find(owner_node_id);
if (node_it != owners_.end() && node_it->second.count(owner_id)) {
Expand Down Expand Up @@ -957,6 +959,7 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,

bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT &&
disconnect_type != rpc::WorkerExitType::USER_ERROR;

// Destroy all actors that are owned by this worker.
const auto it = owners_.find(node_id);
if (it != owners_.end() && it->second.count(worker_id)) {
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/pubsub_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch(
if (sender_id.empty()) {
sender_id = request.subscriber_id();
}

auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
iter = sender_to_subscribers_.insert({sender_id, {}}).first;
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(

/// Helper function to produce worker failure data.
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &node_id,
const WorkerID &worker_id,
int64_t timestamp,
rpc::WorkerExitType disconnect_type,
Expand All @@ -117,6 +118,7 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
// Only report the worker id + delta (new data upon worker failures).
// GCS will merge the data with original worker data.
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(node_id.Binary());
worker_failure_info_ptr->set_timestamp(timestamp);
worker_failure_info_ptr->set_exit_type(disconnect_type);
worker_failure_info_ptr->set_exit_detail(disconnect_detail);
Expand Down
24 changes: 18 additions & 6 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1468,15 +1468,13 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
}
// Publish the worker failure.
auto worker_failure_data_ptr =
gcs::CreateWorkerFailureData(worker->WorkerId(),
gcs::CreateWorkerFailureData(self_node_id_,
worker->WorkerId(),
time(nullptr),
disconnect_type,
disconnect_detail,
worker->GetProcess().GetId(),
creation_task_exception);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
Expand Down Expand Up @@ -1551,9 +1549,23 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie

local_task_manager_->ClearWorkerBacklog(worker->WorkerId());
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());

#ifdef _WIN32
// On Windows, when the worker is killed, client async wait won't get notified
// somehow.
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));
client->Close();

#else
// ReportWorkerFailure should happen after the worker exit completely.
// A better way is to monitor the pid exit. But that needs Process.h
// support async operation.
// Here we monitor the socket to achieve similar result.
// When the worker exited, the pid will be disconnected (local stream socket).
client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] {
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
});
#endif
// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down