Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Deflakey test advanced 9" #35075

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,8 @@ def get_actor_info(name: str):
_, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME)
ray.kill(serve.context._global_client._controller, no_restart=False)
# wait for controller is alive again
wait_for_condition(
lambda: get_actor_info(SERVE_CONTROLLER_NAME) is not None
and get_actor_info(SERVE_CONTROLLER_NAME)[1] != controller1_pid
)
wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME)
assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1]

# Let the actor proceed initialization
ray.get(signal.send.remote())
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/test_advanced_9.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def ready(self):
run_string_as_driver(script.format(address=call_ray_start_2, val=2))


@pytest.mark.skipif(sys.platform == "win32", reason="Not valid on win32.")
@pytest.mark.skipif(sys.platform != "linux", reason="Only works on linux.")
def test_gcs_connection_no_leak(ray_start_cluster):
cluster = ray_start_cluster
head_node = cluster.add_node()
Expand Down
13 changes: 3 additions & 10 deletions python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
the list of PIDs that are children of the Ray worker
processes.
"""
ray_start_cluster.add_node()
ray_start_cluster.wait_for_nodes()

output_file_path = tmp_path / "leaked_pids.json"
driver_script = f"""
Expand All @@ -376,7 +374,7 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
import shutil
import time
import os
ray.init("{ray_start_cluster.address}")

@ray.remote
class Actor:
def create_leaked_child_process(self, num_to_leak):
Expand Down Expand Up @@ -426,6 +424,7 @@ def task():
print(os.getpid())
time.sleep(1)
"""

driver_proc = run_string_as_driver_nonblocking(driver_script)

# Wait for the json file containing the child PIDS
Expand All @@ -444,15 +443,9 @@ def task():
assert all([proc.status() == psutil.STATUS_SLEEPING for proc in processes])

# Valdiate children of worker process die after SIGINT.
def check():
for proc in processes:
if proc.is_running():
print(proc)
return all([not proc.is_running() for proc in processes])

driver_proc.send_signal(signal.SIGINT)
wait_for_condition(
condition_predictor=check,
condition_predictor=lambda: all([not proc.is_running() for proc in processes]),
timeout=30,
)

Expand Down
6 changes: 0 additions & 6 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {

std::string DebugString() const;

void AsyncWaitTerminated(std::function<void()> callback) {
// Async wait until the connection is disconnected.
socket_.async_wait(local_stream_socket::wait_type::wait_error,
[callback = std::move(callback)](auto) { callback(); });
}

protected:
/// A private constructor for a server connection.
ServerConnection(local_stream_socket &&socket);
Expand Down
8 changes: 3 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -785,9 +785,8 @@ void CoreWorker::Exit(
detail = std::move(detail),
creation_task_exception_pb_bytes]() {
rpc::DrainServerCallExecutor();
KillChildProcs();
// Disconnect here after KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail, creation_task_exception_pb_bytes);
KillChildProcs();
Shutdown();
},
"CoreWorker.Shutdown");
Expand Down Expand Up @@ -831,11 +830,10 @@ void CoreWorker::ForceExit(const rpc::WorkerExitType exit_type,
const std::string &detail) {
RAY_LOG(WARNING) << "Force exit the process. "
<< " Details: " << detail;
KillChildProcs();

// Disconnect here before KillChildProcs to make the Raylet async wait shorter.
Disconnect(exit_type, detail);

KillChildProcs();

// NOTE(hchen): Use `QuickExit()` to force-exit this process without doing cleanup.
// `exit()` will destruct static objects in an incorrect order, which will lead to
// core dumps.
Expand Down
4 changes: 1 addition & 3 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,7 @@ Status WorkerInfoAccessor::AsyncReportWorkerFailure(
const std::shared_ptr<rpc::WorkerTableData> &data_ptr,
const StatusCallback &callback) {
rpc::Address worker_address = data_ptr->worker_address();
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString()
<< " WorkerID=" << WorkerID::FromBinary(worker_address.worker_id())
<< " NodeID=" << NodeID::FromBinary(worker_address.raylet_id());
RAY_LOG(DEBUG) << "Reporting worker failure, " << worker_address.DebugString();
rpc::ReportWorkerFailureRequest request;
request.mutable_worker_failure()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportWorkerFailure(
Expand Down
40 changes: 40 additions & 0 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,46 @@ TEST_P(GcsClientTest, DISABLED_TestGetActorPerf) {
<< actor_count << " actors.";
}

TEST_P(GcsClientTest, TestEvictExpiredDestroyedActors) {
// Restart doesn't work with in memory storage
if (RayConfig::instance().gcs_storage() == "memory") {
return;
}
// Register actors and the actors will be destroyed.
JobID job_id = JobID::FromInt(1);
AddJob(job_id);
absl::flat_hash_set<ActorID> actor_ids;
int actor_count = RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// Restart GCS.
RestartGcsServer();

for (int index = 0; index < actor_count; ++index) {
auto actor_table_data = Mocker::GenActorTableData(job_id);
RegisterActor(actor_table_data, false);
actor_ids.insert(ActorID::FromBinary(actor_table_data->actor_id()));
}

// NOTE: GCS will not reply when actor registration fails, so when GCS restarts, gcs
// client will register the actor again and the status of the actor may be
// `DEPENDENCIES_UNREADY` or `DEAD`. We should get all dead actors.
auto condition = [this]() {
return GetAllActors(true).size() ==
RayConfig::instance().maximum_gcs_destroyed_actor_cached_count();
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));

auto actors = GetAllActors(true);
for (const auto &actor : actors) {
EXPECT_TRUE(actor_ids.contains(ActorID::FromBinary(actor.actor_id())));
}
}

TEST_P(GcsClientTest, TestEvictExpiredDeadNodes) {
// Restart GCS.
RestartGcsServer();
Expand Down
19 changes: 8 additions & 11 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,7 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
auto it = workers.find(owner_id);
if (it == workers.end()) {
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id
<< ", job id = " << actor_id.JobId()
<< " owner node id = " << owner_node_id;
<< ", job id = " << actor_id.JobId();
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
it = workers.emplace(owner_id, Owner(std::move(client))).first;
Expand All @@ -777,15 +776,14 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
[this, owner_node_id, owner_id, actor_id](
Status status, const rpc::WaitForActorOutOfScopeReply &reply) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to wait for actor " << actor_id
<< " out of scope, job id = " << actor_id.JobId()
<< ", error: " << status.ToString();
// TODO(iycheng): Retry it in other PR.
return;
RAY_LOG(INFO) << "Worker " << owner_id
<< " failed, destroying actor child, job id = "
<< actor_id.JobId();
} else {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();
}
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor, job id = "
<< actor_id.JobId();

auto node_it = owners_.find(owner_node_id);
if (node_it != owners_.end() && node_it->second.count(owner_id)) {
Expand Down Expand Up @@ -959,7 +957,6 @@ void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,

bool need_reconstruct = disconnect_type != rpc::WorkerExitType::INTENDED_USER_EXIT &&
disconnect_type != rpc::WorkerExitType::USER_ERROR;

// Destroy all actors that are owned by this worker.
const auto it = owners_.find(node_id);
if (it != owners_.end() && it->second.count(worker_id)) {
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/pubsub_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch(
if (sender_id.empty()) {
sender_id = request.subscriber_id();
}

auto iter = sender_to_subscribers_.find(sender_id);
if (iter == sender_to_subscribers_.end()) {
iter = sender_to_subscribers_.insert({sender_id, {}}).first;
Expand Down
2 changes: 0 additions & 2 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(

/// Helper function to produce worker failure data.
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &node_id,
const WorkerID &worker_id,
int64_t timestamp,
rpc::WorkerExitType disconnect_type,
Expand All @@ -118,7 +117,6 @@ inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
// Only report the worker id + delta (new data upon worker failures).
// GCS will merge the data with original worker data.
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(node_id.Binary());
worker_failure_info_ptr->set_timestamp(timestamp);
worker_failure_info_ptr->set_exit_type(disconnect_type);
worker_failure_info_ptr->set_exit_detail(disconnect_detail);
Expand Down
24 changes: 6 additions & 18 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1480,13 +1480,15 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
}
// Publish the worker failure.
auto worker_failure_data_ptr =
gcs::CreateWorkerFailureData(self_node_id_,
worker->WorkerId(),
gcs::CreateWorkerFailureData(worker->WorkerId(),
time(nullptr),
disconnect_type,
disconnect_detail,
worker->GetProcess().GetId(),
creation_task_exception);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
Expand Down Expand Up @@ -1561,23 +1563,9 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie

local_task_manager_->ClearWorkerBacklog(worker->WorkerId());
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());
#ifdef _WIN32
// On Windows, when the worker is killed, client async wait won't get notified
// somehow.
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

client->Close();
#else
// ReportWorkerFailure should happen after the worker exit completely.
// A better way is to monitor the pid exit. But that needs Process.h
// support async operation.
// Here we monitor the socket to achieve similar result.
// When the worker exited, the pid will be disconnected (local stream socket).
client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] {
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
});
#endif

// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down