Skip to content

Commit

Permalink
[Core] Expose NodeDeathInfo in ActorDiedError
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <[email protected]>
  • Loading branch information
ruisearch42 committed May 30, 2024
1 parent dd7cbee commit 81af2bb
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 71 deletions.
9 changes: 8 additions & 1 deletion python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
ActorDiedErrorContext,
Address,
Language,
NodeDeathInfo,
RayException,
)
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -340,7 +341,7 @@ class ActorDiedError(RayActorError):

BASE_ERROR_MSG = "The actor died unexpectedly before finishing this task."

def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext] = None):
def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext, NodeDeathInfo] = None):
"""
Construct a RayActorError by building the arguments.
"""
Expand All @@ -361,6 +362,12 @@ def __init__(self, cause: Union[RayTaskError, ActorDiedErrorContext] = None):
" raised in its creation task, "
f"{cause.__str__()}"
)
elif isinstance(cause, NodeDeathInfo):
error_msg = (
"The actor died because the node was terminated."
)
# FIXME: we don't have actor_id info
# actor_id = ActorID(cause.actor_id).hex()
else:
# Inidicating system-level actor failures.
assert isinstance(cause, ActorDiedErrorContext)
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,9 @@ void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
<< ", num_restarts: " << actor_data.num_restarts()
<< ", death context type="
<< gcs::GetActorDeathCauseString(actor_data.death_cause());
if (actor_data.preempted()) {
direct_actor_submitter_->SetPreempted(actor_id);
}
// if (actor_data.preempted()) {
// direct_actor_submitter_->SetPreempted(actor_id);
// }

if (actor_data.state() == rpc::ActorTableData::RESTARTING) {
direct_actor_submitter_->DisconnectActor(actor_id,
Expand Down
25 changes: 13 additions & 12 deletions src/ray/core_worker/transport/direct_actor_task_submitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,18 @@ void CoreWorkerDirectActorTaskSubmitter::FailTaskWithError(
if (!task.actor_preempted) {
error_info = task.timeout_error_info;
} else {
// Special error for preempted actor. The task "timed out" because the actor may
// not have sent a notification to the gcs; regardless we already know it's
// preempted and it's dead.
rpc::ActorDeathCause &actor_death_cause = *error_info.mutable_actor_died_error();
actor_death_cause.mutable_actor_died_error_context()->set_actor_id(
task.task_spec.ActorId().Binary());
actor_death_cause.mutable_actor_died_error_context()->set_preempted(
task.actor_preempted);

error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
error_info.set_error_message("Actor died by preemption.");
// // Special error for preempted actor. The task "timed out" because the actor may
// // not have sent a notification to the gcs; regardless we already know it's
// // preempted and it's dead.
// rpc::ActorDeathCause &actor_death_cause = *error_info.mutable_actor_died_error();
// actor_death_cause.mutable_actor_died_error_context()->set_actor_id(
// task.task_spec.ActorId().Binary());
// actor_death_cause.mutable_actor_died_error_context()->set_preempted(
// task.actor_preempted);

// error_info.set_error_type(rpc::ErrorType::ACTOR_DIED);
// error_info.set_error_message("Actor died by preemption.");
RAY_LOG(ERROR) << "Unexpected path";
}
GetTaskFinisherWithoutMu().FailPendingTask(
task.task_spec.TaskId(), error_info.error_type(), &task.status, &error_info);
Expand All @@ -375,7 +376,7 @@ void CoreWorkerDirectActorTaskSubmitter::CheckTimeoutTasks() {
while (deque_itr != deque.end() && (*deque_itr)->deadline_ms < now) {
// Populate the info of whether the actor is preempted. If so we hard fail the
// task.
(*deque_itr)->actor_preempted = client_queue.preempted;
// (*deque_itr)->actor_preempted = client_queue.preempted;
timeout_tasks.push_back(*deque_itr);
deque_itr = deque.erase(deque_itr);
}
Expand Down
16 changes: 8 additions & 8 deletions src/ray/core_worker/transport/direct_actor_task_submitter.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class CoreWorkerDirectActorTaskSubmitterInterface {

/// Mark that the corresponding actor is preempted (e.g., spot preemption).
/// If called, preempted = true will be set in the death cause upon actor death.
virtual void SetPreempted(const ActorID &actor_id) = 0;
// virtual void SetPreempted(const ActorID &actor_id) = 0;

virtual ~CoreWorkerDirectActorTaskSubmitterInterface() {}
};
Expand All @@ -90,12 +90,12 @@ class CoreWorkerDirectActorTaskSubmitter
::RayConfig::instance().actor_excess_queueing_warn_threshold();
}

void SetPreempted(const ActorID &actor_id) {
absl::MutexLock lock(&mu_);
if (auto iter = client_queues_.find(actor_id); iter != client_queues_.end()) {
iter->second.preempted = true;
}
}
// void SetPreempted(const ActorID &actor_id) {
// absl::MutexLock lock(&mu_);
// if (auto iter = client_queues_.find(actor_id); iter != client_queues_.end()) {
// iter->second.preempted = true;
// }
// }

/// Add an actor queue. This should be called whenever a reference to an
/// actor is created in the language frontend.
Expand Down Expand Up @@ -293,7 +293,7 @@ class CoreWorkerDirectActorTaskSubmitter
/// messages from the GCS.
int64_t num_restarts = -1;
/// Whether this actor exits by spot preemption.
bool preempted = false;
// bool preempted = false;
/// The RPC client. We use shared_ptr to enable shared_from_this for
/// pending client callbacks.
std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client = nullptr;
Expand Down
54 changes: 28 additions & 26 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause(
const std::string ip_address,
std::shared_ptr<rpc::GcsNodeInfo> node) {
ray::rpc::ActorDeathCause death_cause;

auto actor_died_error_ctx = death_cause.mutable_actor_died_error_context();
AddActorInfo(actor, actor_died_error_ctx);
actor_died_error_ctx->set_error_message(
Expand All @@ -214,7 +215,8 @@ const ray::rpc::ActorDeathCause GcsActorManager::GenNodeDiedCause(
// TODO(vitsai): Publish this information as well
if (auto death_info = node->death_info();
death_info.reason() == rpc::NodeDeathInfo::AUTOSCALER_DRAIN_PREEMPTED) {
actor_died_error_ctx->set_preempted(true);
auto node_death_info = actor_died_error_ctx->mutable_node_death_info();
node_death_info->CopyFrom(node->death_info());
}
return death_cause;
}
Expand Down Expand Up @@ -1106,31 +1108,31 @@ void GcsActorManager::OnNodeDead(std::shared_ptr<rpc::GcsNodeInfo> node,
}
}

void GcsActorManager::SetPreemptedAndPublish(const NodeID &node_id) {
// The node has received a drain request, so we mark all of its actors
// preempted. This state will be published to the raylets so that the
// preemption may be retrieved upon actor death.
if (created_actors_.find(node_id) == created_actors_.end()) {
return;
}

for (const auto &id_iter : created_actors_.find(node_id)->second) {
auto actor_iter = registered_actors_.find(id_iter.second);
RAY_CHECK(actor_iter != registered_actors_.end())
<< "Could not find actor " << id_iter.second.Hex() << " in registered actors.";

actor_iter->second->GetMutableActorTableData()->set_preempted(true);

const auto &actor_id = id_iter.second;
const auto &actor_table_data = actor_iter->second->GetActorTableData();

RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
RAY_CHECK_OK(gcs_publisher_->PublishActor(
actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr));
}));
}
}
// void GcsActorManager::SetPreemptedAndPublish(const NodeID &node_id) {
// // The node has received a drain request, so we mark all of its actors
// // preempted. This state will be published to the raylets so that the
// // preemption may be retrieved upon actor death.
// if (created_actors_.find(node_id) == created_actors_.end()) {
// return;
// }

// for (const auto &id_iter : created_actors_.find(node_id)->second) {
// auto actor_iter = registered_actors_.find(id_iter.second);
// RAY_CHECK(actor_iter != registered_actors_.end())
// << "Could not find actor " << id_iter.second.Hex() << " in registered actors.";

// actor_iter->second->GetMutableActorTableData()->set_preempted(true);

// const auto &actor_id = id_iter.second;
// const auto &actor_table_data = actor_iter->second->GetActorTableData();

// RAY_CHECK_OK(gcs_table_storage_->ActorTable().Put(
// actor_id, actor_table_data, [this, actor_id, actor_table_data](Status status) {
// RAY_CHECK_OK(gcs_publisher_->PublishActor(
// actor_id, *GenActorDataOnlyWithStates(actor_table_data), nullptr));
// }));
// }
// }

void GcsActorManager::ReconstructActor(const ActorID &actor_id,
bool need_reschedule,
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {

/// Set actors on the node as preempted and publish the actor information.
/// If the node is already dead, this method is a no-op.
void SetPreemptedAndPublish(const NodeID &node_id);
// void SetPreemptedAndPublish(const NodeID &node_id);

/// Create actor asynchronously.
///
Expand Down Expand Up @@ -560,7 +560,7 @@ class GcsActorManager : public rpc::ActorInfoHandler {
actor_delta->set_start_time(actor.start_time());
actor_delta->set_end_time(actor.end_time());
actor_delta->set_repr_name(actor.repr_name());
actor_delta->set_preempted(actor.preempted());
// actor_delta->set_preempted(actor.preempted());
// Acotr's namespace and name are used for removing cached name when it's dead.
if (!actor.ray_namespace().empty()) {
actor_delta->set_ray_namespace(actor.ray_namespace());
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,9 @@ void GcsAutoscalerStateManager::HandleDrainNode(
return;
}

if (RayConfig::instance().enable_reap_actor_death()) {
gcs_actor_manager_.SetPreemptedAndPublish(node_id);
}
// if (RayConfig::instance().enable_reap_actor_death()) {
// gcs_actor_manager_.SetPreemptedAndPublish(node_id);
// }

auto node = std::move(maybe_node.value());
rpc::Address raylet_address;
Expand Down
18 changes: 16 additions & 2 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,20 @@ message RayException {
string formatted_exception_string = 3;
}

message NodeDeathInfo {
// TODO(sang): Update drain reason
enum Reason {
UNSPECIFIED = 0;
EXPECTED_TERMINATION = 1;
UNEXPECTED_TERMINATION = 2;
AUTOSCALER_DRAIN_PREEMPTED = 3;
AUTOSCALER_DRAIN_IDLE = 4;
}
Reason reason = 1;
// A message describing the reason for the node death.
string reason_message = 2;
}

message ActorDeathCause {
oneof context {
// Indicates that this actor is marked as DEAD due to actor creation task failure.
Expand Down Expand Up @@ -350,8 +364,8 @@ message ActorDiedErrorContext {
// Whether the actor had never started running before it died, i.e. it was cancelled
// before scheduling had completed.
bool never_started = 10;
// Whether the actor was on a preempted node.
bool preempted = 11;
// The node death info in case it is the cause of actor death.
optional NodeDeathInfo node_death_info = 11;
}

// Context for task OOM.
Expand Down
14 changes: 0 additions & 14 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,20 +318,6 @@ message NodeSnapshot {
repeated string node_activity = 3;
}

message NodeDeathInfo {
// TODO(sang): Update drain reason
enum Reason {
UNSPECIFIED = 0;
EXPECTED_TERMINATION = 1;
UNEXPECTED_TERMINATION = 2;
AUTOSCALER_DRAIN_PREEMPTED = 3;
AUTOSCALER_DRAIN_IDLE = 4;
}
Reason reason = 1;
// A message describing the reason for the node death.
string reason_message = 2;
}

message GcsNodeInfo {
// State of a node.
enum GcsNodeState {
Expand Down

0 comments on commit 81af2bb

Please sign in to comment.