Skip to content

Commit

Permalink
[Core] Reconstruct actor to run lineage reconstruction triggered acto…
Browse files Browse the repository at this point in the history
…r task (#47396)

Currently if we need to rerun an actor task to recover a lost object but the actor is dead, the actor task will fail immediately. This PR allows the actor to be restarted (if it doesn't violate max_restarts) so that the actor task can run to recover lost objects.

In terms of the state machine, we add a state transition from DEAD to RESTARTING.

Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Sep 10, 2024
1 parent feb21c9 commit 0773760
Show file tree
Hide file tree
Showing 45 changed files with 1,056 additions and 318 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,7 @@ ray_cc_test(
tags = ["team:core"],
deps = [
":core_worker_lib",
":ray_mock",
"@com_google_googletest//:gtest_main",
],
)
Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ py_test_module_list(
py_test_module_list(
files = [
"test_actor.py",
"test_actor_lineage_reconstruction.py",
"test_actor_retry1.py",
"test_actor_retry2.py",
"test_actor_failures.py",
Expand Down
109 changes: 109 additions & 0 deletions python/ray/tests/test_actor_lineage_reconstruction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import os
import gc
import sys

import pytest

import ray
from ray._private.test_utils import wait_for_condition
from ray.core.generated import gcs_pb2
from ray.core.generated import common_pb2


def test_actor_reconstruction_triggered_by_lineage_reconstruction(ray_start_cluster):
# Test the sequence of events:
# actor goes out of scope and killed
# -> lineage reconstruction triggered by object lost
# -> actor is restarted
# -> actor goes out of scope again after lineage reconstruction is done
# -> actor is permanently dead when there is no reference.
cluster = ray_start_cluster
cluster.add_node(resources={"head": 1})
ray.init(address=cluster.address)
worker1 = cluster.add_node(resources={"worker": 1})

@ray.remote(
num_cpus=1, resources={"worker": 1}, max_restarts=-1, max_task_retries=-1
)
class Actor:
def ping(self):
return [1] * 1024 * 1024

actor = Actor.remote()
actor_id = actor._actor_id

obj1 = actor.ping.remote()
obj2 = actor.ping.remote()

# Make the actor out of scope
actor = None

def verify1():
gc.collect()
actor_info = ray._private.state.state.global_state_accessor.get_actor_info(
actor_id
)
assert actor_info is not None
actor_info = gcs_pb2.ActorTableData.FromString(actor_info)
assert actor_info.state == gcs_pb2.ActorTableData.ActorState.DEAD
assert (
actor_info.death_cause.actor_died_error_context.reason
== common_pb2.ActorDiedErrorContext.Reason.OUT_OF_SCOPE
)
assert actor_info.num_restarts_due_to_lineage_reconstruction == 0
return True

wait_for_condition(lambda: verify1())

# objs will be lost and recovered
# during the process, actor will be reconstructured
# and dead again after lineage reconstruction finishes
cluster.remove_node(worker1)
cluster.add_node(resources={"worker": 1})

assert ray.get(obj1) == [1] * 1024 * 1024
assert ray.get(obj2) == [1] * 1024 * 1024

def verify2():
actor_info = ray._private.state.state.global_state_accessor.get_actor_info(
actor_id
)
assert actor_info is not None
actor_info = gcs_pb2.ActorTableData.FromString(actor_info)
assert actor_info.state == gcs_pb2.ActorTableData.ActorState.DEAD
assert (
actor_info.death_cause.actor_died_error_context.reason
== common_pb2.ActorDiedErrorContext.Reason.OUT_OF_SCOPE
)
# 1 restart recovers two objects
assert actor_info.num_restarts_due_to_lineage_reconstruction == 1
return True

wait_for_condition(lambda: verify2())

# actor can be permanently dead since no lineage reconstruction will happen
del obj1
del obj2

def verify3():
actor_info = ray._private.state.state.global_state_accessor.get_actor_info(
actor_id
)
assert actor_info is not None
actor_info = gcs_pb2.ActorTableData.FromString(actor_info)
assert actor_info.state == gcs_pb2.ActorTableData.ActorState.DEAD
assert (
actor_info.death_cause.actor_died_error_context.reason
== common_pb2.ActorDiedErrorContext.Reason.REF_DELETED
)
assert actor_info.num_restarts_due_to_lineage_reconstruction == 1
return True

wait_for_condition(lambda: verify3())


if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 1 addition & 1 deletion python/ray/tests/test_gcs_fault_tolerance.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def condition():
# Wait for the actor dead.
wait_for_condition(condition, timeout=10)

# If `PollOwnerForActorOutOfScope` was successfully called,
# If `ReportActorOutOfScope` was successfully called,
# name should be properly deleted.
with pytest.raises(ValueError):
ray.get_actor("abc")
Expand Down
6 changes: 6 additions & 0 deletions python/ray/util/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,12 @@ class ActorState(StateSchema):
placement_group_id: Optional[str] = state_column(detail=True, filterable=True)
#: Actor's repr name if a customized __repr__ method exists, else empty string.
repr_name: Optional[str] = state_column(detail=True, filterable=True)
#: Number of restarts that has been tried on this actor.
num_restarts: int = state_column(filterable=False, detail=True)
#: Number of times this actor is restarted due to lineage reconstructions.
num_restarts_due_to_lineage_reconstruction: int = state_column(
filterable=False, detail=True
)


@dataclass(init=not IS_PYDANTIC_2)
Expand Down
12 changes: 12 additions & 0 deletions src/mock/ray/core_worker/actor_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class MockActorCreatorInterface : public ActorCreatorInterface {
(const TaskSpecification &task_spec,
const rpc::ClientCallback<rpc::CreateActorReply> &callback),
(override));
MOCK_METHOD(Status,
AsyncRestartActor,
(const ActorID &actor_id,
uint64_t num_restarts,
gcs::StatusCallback callback),
(override));
MOCK_METHOD(Status,
AsyncReportActorOutOfScope,
(const ActorID &actor_id,
uint64_t num_restarts_due_to_lineage_reconstruction,
gcs::StatusCallback callback),
(override));
MOCK_METHOD(void,
AsyncWaitForActorRegisterFinish,
(const ActorID &actor_id, gcs::StatusCallback callback),
Expand Down
6 changes: 3 additions & 3 deletions src/mock/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ class MockCoreWorker : public CoreWorker {
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleWaitForActorOutOfScope,
(rpc::WaitForActorOutOfScopeRequest request,
rpc::WaitForActorOutOfScopeReply *reply,
HandleWaitForActorRefDeleted,
(rpc::WaitForActorRefDeletedRequest request,
rpc::WaitForActorRefDeletedReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
Expand Down
56 changes: 56 additions & 0 deletions src/mock/ray/core_worker/reference_count.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include "gmock/gmock.h"
#include "ray/core_worker/reference_count.h"
namespace ray {
namespace core {

class MockReferenceCounter : public ReferenceCounterInterface {
public:
MockReferenceCounter() : ReferenceCounterInterface() {}

MOCK_METHOD2(AddLocalReference,
void(const ObjectID &object_id, const std::string &call_sit));

MOCK_METHOD4(AddBorrowedObject,
bool(const ObjectID &object_id,
const ObjectID &outer_id,
const rpc::Address &owner_address,
bool foreign_owner_already_monitoring));

MOCK_METHOD8(AddOwnedObject,
void(const ObjectID &object_id,
const std::vector<ObjectID> &contained_ids,
const rpc::Address &owner_address,
const std::string &call_site,
const int64_t object_size,
bool is_reconstructable,
bool add_local_ref,
const absl::optional<NodeID> &pinned_at_raylet_id));

MOCK_METHOD2(AddObjectPrimaryCopyDeleteCallback,
bool(const ObjectID &object_id,
const std::function<void(const ObjectID &)> callback));

MOCK_METHOD2(SetObjectRefDeletedCallback,
bool(const ObjectID &object_id,
const std::function<void(const ObjectID &)> callback));

virtual ~MockReferenceCounter() {}
};

} // namespace core
} // namespace ray
6 changes: 3 additions & 3 deletions src/mock/ray/rpc/worker/core_worker_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ class MockCoreWorkerClientInterface : public ray::pubsub::MockSubscriberClientIn
const ClientCallback<GetObjectStatusReply> &callback),
(override));
MOCK_METHOD(void,
WaitForActorOutOfScope,
(const WaitForActorOutOfScopeRequest &request,
const ClientCallback<WaitForActorOutOfScopeReply> &callback),
WaitForActorRefDeleted,
(const WaitForActorRefDeletedRequest &request,
const ClientCallback<WaitForActorRefDeletedReply> &callback),
(override));
MOCK_METHOD(void,
PubsubLongPolling,
Expand Down
22 changes: 22 additions & 0 deletions src/ray/core_worker/actor_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ class ActorCreatorInterface {
virtual Status AsyncRegisterActor(const TaskSpecification &task_spec,
gcs::StatusCallback callback) = 0;

virtual Status AsyncRestartActor(const ActorID &actor_id,
uint64_t num_restarts,
gcs::StatusCallback callback) = 0;

virtual Status AsyncReportActorOutOfScope(
const ActorID &actor_id,
uint64_t num_restarts_due_to_lineage_reconstructions,
gcs::StatusCallback callback) = 0;

/// Asynchronously request GCS to create the actor.
///
/// \param task_spec The specification for the actor creation task.
Expand Down Expand Up @@ -96,6 +105,19 @@ class DefaultActorCreator : public ActorCreatorInterface {
});
}

Status AsyncRestartActor(const ActorID &actor_id,
uint64_t num_restarts,
gcs::StatusCallback callback) override {
return gcs_client_->Actors().AsyncRestartActor(actor_id, num_restarts, callback);
}

Status AsyncReportActorOutOfScope(const ActorID &actor_id,
uint64_t num_restarts_due_to_lineage_reconstruction,
gcs::StatusCallback callback) override {
return gcs_client_->Actors().AsyncReportActorOutOfScope(
actor_id, num_restarts_due_to_lineage_reconstruction, callback);
}

bool IsActorInRegistering(const ActorID &actor_id) const override {
return registering_actors_->find(actor_id) != registering_actors_->end();
}
Expand Down
Loading

0 comments on commit 0773760

Please sign in to comment.