Skip to content

Commit

Permalink
Revert "Deflakey test advanced 9 (ray-project#34883)"
Browse files Browse the repository at this point in the history
This reverts commit d60e73e.
  • Loading branch information
ollie-iterators authored May 5, 2023
1 parent 0a15649 commit 77905fd
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 60 deletions.
6 changes: 2 additions & 4 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,8 @@ def get_actor_info(name: str):
_, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME)
ray.kill(serve.context._global_client._controller, no_restart=False)
# wait for controller is alive again
wait_for_condition(
lambda: get_actor_info(SERVE_CONTROLLER_NAME) is not None
and get_actor_info(SERVE_CONTROLLER_NAME)[1] != controller1_pid
)
wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME)
assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1]

# Let the actor proceed initialization
ray.get(signal.send.remote())
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def ready(self):
run_string_as_driver(script.format(address=call_ray_start_2, val=2))


@pytest.mark.skipif(sys.platform == "win32", reason="Not valid on win32.")
@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
def test_gcs_connection_no_leak(ray_start_cluster):
cluster = ray_start_cluster
head_node = cluster.add_node()
Expand Down
13 changes: 3 additions & 10 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
the list of PIDs that are children of the Ray worker
processes.
"""
ray_start_cluster.add_node()
ray_start_cluster.wait_for_nodes()

output_file_path = tmp_path / "leaked_pids.json"
driver_script = f"""
Expand All @@ -376,7 +374,7 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
import shutil
import time
import os
ray.init("{ray_start_cluster.address}")
@ray.remote
class Actor:
def create_leaked_child_process(self, num_to_leak):
Expand Down Expand Up @@ -426,6 +424,7 @@ def task():
print(os.getpid())
time.sleep(1)
"""

driver_proc = run_string_as_driver_nonblocking(driver_script)

# Wait for the json file containing the child PIDS
Expand All @@ -444,15 +443,9 @@ def task():
assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes])

# Valdiate children of worker process die after SIGINT.
def check():
for proc in processes:
if proc.is_running():
print(proc)
return all([not proc.is_running() for proc in processes])

driver_proc.send_signal(signal.SIGINT)
wait_for_condition(
condition_predictor=check,
condition_predictor=lambda: all([not proc.is_running() for proc in processes]),
timeout=30,
)

Expand Down
6 changes: 0 additions & 6 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {

std::string DebugString() const;

void AsyncWaitTerminated(std::function<void()> callback) {
// Async wait until the connection is disconnected.
socket_.async_wait(local_stream_socket::wait_type::wait_error,
[callback = std::move(callback)](auto) { callback(); });
}

protected:
/// A private constructor for a server connection.
ServerConnection(local_stream_socket &&socket);
Expand Down
8 changes: 3 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,8 @@ void CoreWorker::Exit(
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainServerCallExecutor();
KillChildProcs();
// Disconnect here after KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
Shutdown();
},
"CoreWorker.Shutdown");
Expand Down Expand Up @@ -831,11 +830,10 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
const std::string &detail) {
RAY_LOG(WARNING) << "Force exit the process. "
<< " Details: " << detail;
KillChildProcs();

// Disconnect here before KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail);

KillChildProcs();

// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
Expand Down
4 changes: 1 addition & 3 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,7 @@ Status WorkerInfoAccessor::AsyncReportWorkerFailure(
const std::shared_ptr<rpc::WorkerTableData> &data_ptr,
const StatusCallback &callback) {
rpc::Address worker_address = data_ptr->worker_address();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString()
<< " WorkerID=" << WorkerID::FromBinary(worker_address.worker_id())
<< " NodeID=" << NodeID::FromBinary(worker_address.raylet_id());
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString();
rpc::ReportWorkerFailureRequest request;
request.mutable_worker_failure()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportWorkerFailure(
Expand Down
40 changes: 40 additions & 0 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,46 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) {
<< actor_count << " actors.";
}

TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) {
// Restart doesn't work with in memory storage
if (RayConfig::instance().gcs_storage() == "memory") {
return;
}
// Register actors and the actors will be destroyed.
JobID job_id = JobID::FromInt(1);
AddJob(job_id);
absl::flat_hash_set<ActorID> actor_ids;
int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// Restart GCS.
RestartGcsServer();

for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again and the status of the actor may be
// `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors.
auto condition = [this]() {
return GetAllActors(true).size() ==
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));

auto actors = GetAllActors(true);
for (const auto &actor : actors) {
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
}
}

TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) {
// Restart GCS.
RestartGcsServer();
Expand Down
19 changes: 8 additions & 11 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,7 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
auto it = workers.find(owner_id);
if (it == workers.end()) {
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id
<< ", job id = " << actor_id.JobId()
<< " owner node id = " << owner_node_id;
<< ", job id = " << actor_id.JobId();
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
it = workers.emplace(owner_id, Owner(std::move(client))).first;
Expand All @@ -777,15 +776,14 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
[this, owner_node_id, owner_id, actor_id](
Status status, const rpc::WaitForActorOutOfScopeReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to wait for actor " << actor_id
<< " out of scope, job id = " << actor_id.JobId()
<< ", error: " << status.ToString();
// TODO(iycheng): Retry it in other PR.
return;
RAY_LOG(INFO) << "Worker " << owner_id
<< " failed, destroying actor child, job id = "
<< actor_id.JobId();
} else {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();
}
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();

auto node_it = owners_.find(owner_node_id);
if (node_it != owners_.end() && node_it->second.count(owner_id)) {
Expand Down Expand Up @@ -959,7 +957,6 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,

bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT &&
disconnect_type != rpc::WorkerExitType::USER_ERROR;

// Destroy all actors that are owned by this worker.
const auto it = owners_.find(node_id);
if (it != owners_.end() && it->second.count(worker_id)) {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/pubsub_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch(
if (sender_id.empty()) {
sender_id = request.subscriber_id();
}

auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
iter = sender_to_subscribers_.insert({sender_id, {}}).first;
Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(

/// Helper function to produce worker failure data.
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &node_id,
const WorkerID &worker_id,
int64_t timestamp,
rpc::WorkerExitType disconnect_type,
Expand All @@ -118,7 +117,6 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
// Only report the worker id + delta (new data upon worker failures).
// GCS will merge the data with original worker data.
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(node_id.Binary());
worker_failure_info_ptr->set_timestamp(timestamp);
worker_failure_info_ptr->set_exit_type(disconnect_type);
worker_failure_info_ptr->set_exit_detail(disconnect_detail);
Expand Down
24 changes: 6 additions & 18 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1480,13 +1480,15 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
}
// Publish the worker failure.
auto worker_failure_data_ptr =
gcs::CreateWorkerFailureData(self_node_id_,
worker->WorkerId(),
gcs::CreateWorkerFailureData(worker->WorkerId(),
time(nullptr),
disconnect_type,
disconnect_detail,
worker->GetProcess().GetId(),
creation_task_exception);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
Expand Down Expand Up @@ -1561,23 +1563,9 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie

local_task_manager_->ClearWorkerBacklog(worker->WorkerId());
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());
#ifdef _WIN32
// On Windows, when the worker is killed, client async wait won't get notified
// somehow.
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

client->Close();
#else
// ReportWorkerFailure should happen after the worker exit completely.
// A better way is to monitor the pid exit. But that needs Process.h
// support async operation.
// Here we monitor the socket to achieve similar result.
// When the worker exited, the pid will be disconnected (local stream socket).
client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] {
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
});
#endif

// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down

0 comments on commit 77905fd

Please sign in to comment.