Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Placement Group] Support RayPlacementGroupError #10508 #13140

Closed
wants to merge 15 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ObjectSerializer {
String.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();
private static final byte[] TASK_EXECUTION_EXCEPTION_META =
String.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();
private static final byte[] PLACEMENT_GROUP_ERROR_META =
String.valueOf(ErrorType.PLACEMENT_GROUP_ERROR.getNumber()).getBytes();

public static final byte[] OBJECT_METADATA_TYPE_CROSS_LANGUAGE = "XLANG".getBytes();
public static final byte[] OBJECT_METADATA_TYPE_JAVA = "JAVA".getBytes();
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ from ray.exceptions import (
RayError,
RaySystemError,
RayTaskError,
RayPlacementGroupError,
ObjectStoreFullError,
GetTimeoutError,
TaskCancelledError
Expand Down
8 changes: 8 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ def __str__(self):
return "\n".join(out)


class RayPlacementGroupError(RayError):
"""Indicates that the task failed due to placement group errors."""

def __str__(self):
return "The task failed due to placement group error."


class WorkerCrashedError(RayError):
"""Indicates that the worker died unexpectedly while executing a task."""

Expand Down Expand Up @@ -214,6 +221,7 @@ class PlasmaObjectNotAvailable(RayError):
PlasmaObjectNotAvailable,
RayError,
RayTaskError,
RayPlacementGroupError,
WorkerCrashedError,
RayActorError,
ObjectStoreFullError,
Expand Down
3 changes: 2 additions & 1 deletion python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def to_memory_units(memory_bytes, round_up):
REPORTER_DIED_ERROR = "reporter_died"
DASHBOARD_AGENT_DIED_ERROR = "dashboard_agent_died"
DASHBOARD_DIED_ERROR = "dashboard_died"
RAYLET_CONNECTION_ERROR = "raylet_connection_error"
RAYLET_CONNECTION_ERROR = "raylet_connection"
PLACEMENT_GROUP_PUSH_ERROR = "placement_group"

# Used in gpu detection
RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:"
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RayError,
PlasmaObjectNotAvailable,
RayTaskError,
RayPlacementGroupError,
RayActorError,
TaskCancelledError,
WorkerCrashedError,
Expand Down Expand Up @@ -278,6 +279,8 @@ def _deserialize_object(self, data, metadata, object_ref):
return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectLostError(ray.ObjectRef(object_ref.binary()))
elif error_type == ErrorType.Value("PLACEMENT_GROUP_ERROR"):
return RayPlacementGroupError()
else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
"Tried to get object that has been promoted to plasma."
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
actor->UpdateAddress(address);
const auto actor_id = actor->GetActorID();
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id));
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
}

TEST_F(GcsActorManagerTest, TestRegisterActor) {
Expand Down Expand Up @@ -848,10 +848,10 @@ TEST_F(GcsActorManagerTest, TestOwnerAndChildDiedAtTheSameTimeRaceCondition) {
const auto child_worker_id = actor->GetWorkerID();
const auto actor_id = actor->GetActorID();
// Make worker & owner fail at the same time, but owner's failure comes first.
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(child_node_id, child_worker_id))
.WillOnce(Return(actor_id));
gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id, false);
gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id);
}

} // namespace ray
Expand Down
14 changes: 14 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ enum ErrorType {
TASK_CANCELLED = 5;
// Inidicates that creating the GCS service failed to create the actor.
ACTOR_CREATION_FAILED = 6;
// Indicates failure due to placement group.
PLACEMENT_GROUP_ERROR = 7;
}

/// The task exception encapsulates all information about task
Expand Down Expand Up @@ -442,3 +444,15 @@ message MetricPoint {
// [Optional] Unit of the metric.
string units = 6;
}

// Type of a worker exit.
enum ClientDisconnectType {
// Unintended worker exit.
UNEXPECTED_EXIT = 0;
// Intended worker exit.
FINISHED = 1;
// Worker exit due to resource bundle release.
UNUSED_RESOURCE_RELEASED = 2;
// Worker exit due to placement group removal.
PLACEMENT_GROUP_REMOVED = 3;
}
93 changes: 62 additions & 31 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,14 @@ void NodeManager::KillWorker(std::shared_ptr<WorkerInterface> worker) {
});
}

void NodeManager::DestroyWorker(std::shared_ptr<WorkerInterface> worker) {
void NodeManager::DisconnectAndKillWorker(std::shared_ptr<WorkerInterface> worker,
rpc::ClientDisconnectType disconnect_type) {
// Used to destroy a worker when its bundle resource is released (unused or
// placement group is deleted).
// We should disconnect the client first. Otherwise, we'll remove bundle resources
// before actual resources are returned. Subsequent disconnect request that comes
// due to worker dead will be ignored.
ProcessDisconnectClientMessage(worker->Connection(), /* intentional exit */ true);
DisconnectClient(worker->Connection(), disconnect_type);
worker->MarkDead();
KillWorker(worker);
}
Expand Down Expand Up @@ -602,9 +605,9 @@ void NodeManager::HandleReleaseUnusedBundles(
}

// Kill all workers that are currently associated with the unused bundles.
// NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will
// delete the element of `leased_workers_`. So we need to filter out
// `workers_associated_with_unused_bundles` separately.
// NOTE: We can't traverse directly with `leased_workers_`, because
// `DisconnectAndKillWorker` will delete the element of `leased_workers_`. So we need to
// filter out `workers_associated_with_unused_bundles` separately.
std::vector<std::shared_ptr<WorkerInterface>> workers_associated_with_unused_bundles;
for (const auto &worker_it : leased_workers_) {
auto &worker = worker_it.second;
Expand All @@ -623,7 +626,7 @@ void NodeManager::HandleReleaseUnusedBundles(
<< ", task id: " << worker->GetAssignedTaskId()
<< ", actor id: " << worker->GetActorId()
<< ", worker id: " << worker->WorkerId();
DestroyWorker(worker);
DisconnectAndKillWorker(worker, rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED);
}

// Return unused bundle resources.
Expand Down Expand Up @@ -709,7 +712,7 @@ void NodeManager::WarnResourceDeadlock() {
<< "To resolve the issue, consider creating fewer actors or increase the "
<< "resources available to this Ray cluster. You can ignore this message "
<< "if this Ray cluster is expected to auto-scale.";
auto error_data_ptr = gcs::CreateErrorTableData(
const auto error_data_ptr = gcs::CreateErrorTableData(
"resource_deadlock", error_message.str(), current_time_ms(),
exemplar.GetTaskSpecification().JobId());
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
Expand Down Expand Up @@ -1255,7 +1258,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
static_cast<int64_t>(protocol::MessageType::RegisterClientReply), fbb.GetSize(),
fbb.GetBufferPointer(), [this, client](const ray::Status &status) {
if (!status.ok()) {
ProcessDisconnectClientMessage(client);
DisconnectClient(client);
}
});
};
Expand Down Expand Up @@ -1354,8 +1357,8 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
}
}

void NodeManager::ProcessDisconnectClientMessage(
const std::shared_ptr<ClientConnection> &client, bool intentional_disconnect) {
void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &client,
rpc::ClientDisconnectType disconnect_type) {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
bool is_worker = false, is_driver = false;
if (worker) {
Expand All @@ -1378,7 +1381,6 @@ void NodeManager::ProcessDisconnectClientMessage(
if (is_worker && worker->IsDead()) {
// If the worker was killed by us because the driver exited,
// treat it as intentionally disconnected.
intentional_disconnect = true;
// Don't need to unblock the client if it's a worker and is already dead.
// Because in this case, its task is already cleaned up.
RAY_LOG(DEBUG) << "Skip unblocking worker because it's already dead.";
Expand All @@ -1400,32 +1402,51 @@ void NodeManager::ProcessDisconnectClientMessage(
// Publish the worker failure.
auto worker_failure_data_ptr =
gcs::CreateWorkerFailureData(self_node_id_, worker->WorkerId(), worker->IpAddress(),
worker->Port(), time(nullptr), intentional_disconnect);
worker->Port(), time(nullptr), disconnect_type);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
// If the worker was running a task or actor, clean up the task and push an
// error to the driver, unless the worker is already dead.

if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) {
// If the worker was running a task or actor, clean up the task and push an
// error to the driver, unless the worker is already dead.
// TODO(rkn): Define this constant somewhere else.
std::string type_str;
std::ostringstream error_message;

bool intentional_exit = false;
switch (disconnect_type) {
case rpc::ClientDisconnectType::UNEXPECTED_EXIT:
type_str = "worker_died";
error_message << "A worker died or was killed while executing task " << task_id
<< ".";
break;
case rpc::ClientDisconnectType::PLACEMENT_GROUP_REMOVED:
type_str = "placement_group";
error_message << "A worker was killed while executing task " << task_id
<< " due to placement group removal.";
break;
case rpc::ClientDisconnectType::FINISHED:
case rpc::ClientDisconnectType::UNUSED_RESOURCE_RELEASED:
intentional_exit = true;
break;
default:
RAY_LOG(FATAL) << "Unknown client disconnect type " << disconnect_type;
}

// If the worker was an actor, it'll be cleaned by GCS.
if (actor_id.IsNil()) {
Task task;
static_cast<void>(local_queues_.RemoveTask(task_id, &task));
}

if (!intentional_disconnect) {
if (!intentional_exit) {
// Push the error to driver.
const JobID &job_id = worker->GetAssignedJobId();
// TODO(rkn): Define this constant somewhere else.
std::string type = "worker_died";
std::ostringstream error_message;
error_message << "A worker died or was killed while executing task " << task_id
<< ".";
auto error_data_ptr = gcs::CreateErrorTableData(type, error_message.str(),
current_time_ms(), job_id);
const auto error_data_ptr = gcs::CreateErrorTableData(
type_str, error_message.str(), current_time_ms(), job_id);
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
}
}
Expand Down Expand Up @@ -1481,6 +1502,15 @@ void NodeManager::ProcessDisconnectClientMessage(
// these can be leaked.
}

void NodeManager::ProcessDisconnectClientMessage(
const std::shared_ptr<ClientConnection> &client, bool intentional_disconnect) {
if (intentional_disconnect) {
DisconnectClient(client, rpc::ClientDisconnectType::FINISHED);
return;
}
DisconnectClient(client);
}

void NodeManager::ProcessFetchOrReconstructMessage(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
auto message = flatbuffers::GetRoot<protocol::FetchOrReconstruct>(message_data);
Expand Down Expand Up @@ -1569,7 +1599,7 @@ void NodeManager::ProcessWaitRequestMessage(
}
} else {
// We failed to write to the client, so disconnect the client.
ProcessDisconnectClientMessage(client);
DisconnectClient(client);
}
});
RAY_CHECK_OK(status);
Expand Down Expand Up @@ -1618,7 +1648,8 @@ void NodeManager::ProcessPushErrorRequestMessage(const uint8_t *message_data) {
auto const &error_message = string_from_flatbuf(*message->error_message());
double timestamp = message->timestamp();
JobID job_id = from_flatbuf<JobID>(*message->job_id());
auto error_data_ptr = gcs::CreateErrorTableData(type, error_message, timestamp, job_id);
const auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message, timestamp, job_id);
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
}

Expand Down Expand Up @@ -1789,9 +1820,9 @@ void NodeManager::HandleCancelResourceReserve(
<< bundle_spec.DebugString();

// Kill all workers that are currently associated with the placement group.
// NOTE: We can't traverse directly with `leased_workers_`, because `DestroyWorker` will
// delete the element of `leased_workers_`. So we need to filter out
// `workers_associated_with_pg` separately.
// NOTE: We can't traverse directly with `leased_workers_`, because
// `DisconnectAndKillWorker` will delete the element of `leased_workers_`. So we need to
// filter out `workers_associated_with_pg` separately.
std::vector<std::shared_ptr<WorkerInterface>> workers_associated_with_pg;
for (const auto &worker_it : leased_workers_) {
auto &worker = worker_it.second;
Expand All @@ -1807,7 +1838,7 @@ void NodeManager::HandleCancelResourceReserve(
<< ", task id: " << worker->GetAssignedTaskId()
<< ", actor id: " << worker->GetActorId()
<< ", worker id: " << worker->WorkerId();
DestroyWorker(worker);
DisconnectAndKillWorker(worker, rpc::ClientDisconnectType::PLACEMENT_GROUP_REMOVED);
}

// Return bundle resources.
Expand Down Expand Up @@ -1838,7 +1869,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request,

if (worker) {
if (request.disconnect_worker()) {
ProcessDisconnectClientMessage(worker->Connection());
DisconnectClient(worker->Connection());
} else {
// Handle the edge case where the worker was returned before we got the
// unblock RPC by unblocking it immediately (unblock is idempotent).
Expand Down Expand Up @@ -2683,7 +2714,7 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<WorkerInterface> &worke
} else {
RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client";
// We failed to send the task to the worker, so disconnect the worker.
ProcessDisconnectClientMessage(worker->Connection());
DisconnectClient(worker->Connection());
// Queue this task for future assignment. We need to do this since
// DispatchTasks() removed it from the ready queue. The task will be
// assigned to a worker once one becomes available.
Expand Down
18 changes: 15 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,24 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void.
void KillWorker(std::shared_ptr<WorkerInterface> worker);

/// Destroy a worker.
/// Disconnect and kill a worker.
/// We will disconnect the worker connection first and then kill the worker.
///
/// \param worker The worker to destroy.
/// \param worker The worker to stop.
oliverhu marked this conversation as resolved.
Show resolved Hide resolved
/// \param disconnect_type The reason to disconnect the specified client.
/// \return Void.
void DestroyWorker(std::shared_ptr<WorkerInterface> worker);
void DisconnectAndKillWorker(std::shared_ptr<WorkerInterface> worker,
rpc::ClientDisconnectType disconnect_type =
rpc::ClientDisconnectType::UNEXPECTED_EXIT);

/// Disconnect a client.
///
/// \param client The client that sent the message.
/// \param disconnect_type The reason to disconnect the specified client.
/// \return Void.
void DisconnectClient(const std::shared_ptr<ClientConnection> &client,
rpc::ClientDisconnectType disconnect_type =
rpc::ClientDisconnectType::UNEXPECTED_EXIT);

/// When a job finished, loop over all of the queued tasks for that job and
/// treat them as failed.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ void WorkerPool::WarnAboutSize() {
<< "using nested tasks "
<< "(see https://github.com/ray-project/ray/issues/3644) for "
<< "some a discussion of workarounds.";
auto error_data_ptr = gcs::CreateErrorTableData(
const auto error_data_ptr = gcs::CreateErrorTableData(
"worker_pool_large", warning_message.str(), current_time_ms());
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
}
Expand Down