Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Placement Group] Support RayPlacementGroupError #10508 #12821

Closed
wants to merge 15 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class ObjectSerializer {
.valueOf(ErrorType.OBJECT_UNRECONSTRUCTABLE.getNumber()).getBytes();
private static final byte[] TASK_EXECUTION_EXCEPTION_META = String
.valueOf(ErrorType.TASK_EXECUTION_EXCEPTION.getNumber()).getBytes();
private static final byte[] PLACEMENT_GROUP_ERROR_META = String
.valueOf(ErrorType.PLACEMENT_GROUP_ERROR.getNumber()).getBytes();

public static final byte[] OBJECT_METADATA_TYPE_CROSS_LANGUAGE = "XLANG".getBytes();
public static final byte[] OBJECT_METADATA_TYPE_JAVA = "JAVA".getBytes();
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ from ray.exceptions import (
RayError,
RaySystemError,
RayTaskError,
RayPlacementGroupError,
ObjectStoreFullError,
GetTimeoutError,
TaskCancelledError
Expand Down
8 changes: 8 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ def __str__(self):
return "\n".join(out)


class RayPlacementGroupError(RayError):
"""Indicates that the task failed due to placement group errors."""

def __str__(self):
return "The task failed due to placement group error."


class WorkerCrashedError(RayError):
"""Indicates that the worker died unexpectedly while executing a task."""

Expand Down Expand Up @@ -212,6 +219,7 @@ class PlasmaObjectNotAvailable(RayError):
PlasmaObjectNotAvailable,
RayError,
RayTaskError,
RayPlacementGroupError,
WorkerCrashedError,
RayActorError,
ObjectStoreFullError,
Expand Down
3 changes: 2 additions & 1 deletion python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def to_memory_units(memory_bytes, round_up):
REPORTER_DIED_ERROR = "reporter_died"
DASHBOARD_AGENT_DIED_ERROR = "dashboard_agent_died"
DASHBOARD_DIED_ERROR = "dashboard_died"
RAYLET_CONNECTION_ERROR = "raylet_connection_error"
RAYLET_CONNECTION_ERROR = "raylet_connection"
PLACEMENT_GROUP_PUSH_ERROR = "placement_group"

# Used in gpu detection
RESOURCE_CONSTRAINT_PREFIX = "accelerator_type:"
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RayError,
PlasmaObjectNotAvailable,
RayTaskError,
RayPlacementGroupError,
RayActorError,
TaskCancelledError,
WorkerCrashedError,
Expand Down Expand Up @@ -277,6 +278,8 @@ def _deserialize_object(self, data, metadata, object_ref):
return TaskCancelledError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return ObjectLostError(ray.ObjectRef(object_ref.binary()))
elif error_type == ErrorType.Value("PLACEMENT_GROUP_ERROR"):
return RayPlacementGroupError()
else:
assert error_type != ErrorType.Value("OBJECT_IN_PLASMA"), \
"Tried to get object that has been promoted to plasma."
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,8 @@ void GcsActorManager::CollectStats() const {

void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
const ray::WorkerID &worker_id,
bool intentional_exit) {
const rpc::ClientDisconnectType disconnect_type) {
bool intentional_exit = disconnect_type == rpc::ClientDisconnectType::FINISHED;
if (intentional_exit) {
RAY_LOG(INFO) << "Worker " << worker_id << " on node " << node_id
<< " intentional exit.";
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,10 @@ class GcsActorManager : public rpc::ActorInfoHandler {
///
/// \param node_id ID of the node where the dead worker was located.
/// \param worker_id ID of the dead worker.
/// \param intentional_exit Whether the death was intentional. If yes and the
/// worker was an actor, we should not attempt to restart the actor.
/// \param disconnect_type the reason why the worker is disconnected.
void OnWorkerDead(const NodeID &node_id, const WorkerID &worker_id,
bool intentional_exit = false);
rpc::ClientDisconnectType disconnect_type =
rpc::ClientDisconnectType::UNEXPECTED_EXIT);

/// Handle actor creation task failure. This should be called when scheduling
/// an actor creation task is infeasible.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ void GcsServer::InstallEventListeners() {
auto worker_id = WorkerID::FromBinary(worker_address.worker_id());
auto node_id = NodeID::FromBinary(worker_address.raylet_id());
gcs_actor_manager_->OnWorkerDead(node_id, worker_id,
worker_failure_data->intentional_disconnect());
worker_failure_data->disconnect_type());
});

// Install job event listeners.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_worker_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void GcsWorkerManager::HandleReportWorkerFailure(
log_stream << "Reporting worker failure, worker id = " << worker_id
<< ", node id = " << node_id
<< ", address = " << worker_address.ip_address();
if (request.worker_failure().intentional_disconnect()) {
if (request.worker_failure().disconnect_type() == rpc::ClientDisconnectType::FINISHED) {
RAY_LOG(INFO) << log_stream.str();
} else {
RAY_LOG(WARNING) << log_stream.str()
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/test/gcs_actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ TEST_F(GcsActorManagerTest, TestRaceConditionCancelLease) {
actor->UpdateAddress(address);
const auto actor_id = actor->GetActorID();
EXPECT_CALL(*mock_actor_scheduler_, CancelOnLeasing(node_id, actor_id));
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
}

TEST_F(GcsActorManagerTest, TestRegisterActor) {
Expand Down Expand Up @@ -848,10 +848,10 @@ TEST_F(GcsActorManagerTest, TestOwnerAndChildDiedAtTheSameTimeRaceCondition) {
const auto child_worker_id = actor->GetWorkerID();
const auto actor_id = actor->GetActorID();
// Make worker & owner fail at the same time, but owner's failure comes first.
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id, false);
gcs_actor_manager_->OnWorkerDead(owner_node_id, owner_worker_id);
EXPECT_CALL(*mock_actor_scheduler_, CancelOnWorker(child_node_id, child_worker_id))
.WillOnce(Return(actor_id));
gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id, false);
gcs_actor_manager_->OnWorkerDead(child_node_id, child_worker_id);
}

} // namespace ray
Expand Down
5 changes: 3 additions & 2 deletions src/ray/gcs/pb_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ inline std::shared_ptr<ray::rpc::ActorTableData> CreateActorTableData(
inline std::shared_ptr<ray::rpc::WorkerTableData> CreateWorkerFailureData(
const NodeID &raylet_id, const WorkerID &worker_id, const std::string &address,
int32_t port, int64_t timestamp = std::time(nullptr),
bool intentional_disconnect = false) {
const rpc::ClientDisconnectType disconnect_type =
rpc::ClientDisconnectType::UNEXPECTED_EXIT) {
auto worker_failure_info_ptr = std::make_shared<ray::rpc::WorkerTableData>();
worker_failure_info_ptr->mutable_worker_address()->set_raylet_id(raylet_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_worker_id(worker_id.Binary());
worker_failure_info_ptr->mutable_worker_address()->set_ip_address(address);
worker_failure_info_ptr->mutable_worker_address()->set_port(port);
worker_failure_info_ptr->set_timestamp(timestamp);
worker_failure_info_ptr->set_intentional_disconnect(intentional_disconnect);
worker_failure_info_ptr->set_disconnect_type(disconnect_type);
return worker_failure_info_ptr;
}

Expand Down
14 changes: 14 additions & 0 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ enum ErrorType {
TASK_CANCELLED = 5;
// Inidicates that creating the GCS service failed to create the actor.
ACTOR_CREATION_FAILED = 6;
// Indicates failure due to placement group
PLACEMENT_GROUP_ERROR = 7;
}

/// The task exception encapsulates all information about task
Expand Down Expand Up @@ -437,3 +439,15 @@ message MetricPoint {
// [Optional] Unit of the metric.
string units = 6;
}

// Type of a worker exit.
enum ClientDisconnectType {
// Unintended worker exit.
UNEXPECTED_EXIT = 0;
// Intended worker exit.
FINISHED = 1;
// Worker exit due to resource bundle release.
UNUSED_RESOURCE_RELEASED = 2;
// Worker exit due to placement group removal.
PLACEGROUP_REMOVED = 3;
}
4 changes: 2 additions & 2 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ message WorkerTableData {
Address worker_address = 2;
// The UNIX timestamp at which this worker's state was updated.
int64 timestamp = 3;
// Whether it's an intentional disconnect, only applies then `is_alive` is false.
bool intentional_disconnect = 4;
// The type of worker exit.
ClientDisconnectType disconnect_type = 4;
// Type of this worker.
WorkerType worker_type = 5;
// This is for AddWorker.
Expand Down
6 changes: 2 additions & 4 deletions src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ enum MessageType:int {
RegisterClientReply,
// Send the worker's gRPC port to the raylet.
AnnounceWorkerPort,
// Notify the raylet that this client is disconnecting unexpectedly.
// Notify the raylet that this client is disconnecting.
// This is sent from a worker to a raylet.
DisconnectClient,
// Notify the raylet that this client is disconnecting gracefully.
// This is sent from a worker to a raylet.
IntentionalDisconnectClient,
// Tell a worker to execute a task. This is sent from a raylet to a
// worker.
ExecuteTask,
Expand Down Expand Up @@ -111,6 +108,7 @@ table ResourceIdSetInfo {

// This message is sent from a worker to the node manager.
table DisconnectClient {
disconnect_type: int;
}

table ResourceIdSetInfos {
Expand Down
Loading