From 86cd5e344b6936d7cbaa299805fa329457fe87fc Mon Sep 17 00:00:00 2001 From: hongchaodeng Date: Mon, 5 Aug 2024 15:10:06 -0700 Subject: [PATCH] [core] fix PinExistingReturnObject segfault by passing owner_address When worker crash-restarts, there is a chance that it would call PinExistingReturnObject() to pin existing objects. But PinExistingReturnObject() calls GetThreadContext() which doesn't work for async actor methods. The owner_address is already there before calling PinExistingReturnObject(). This fix try to avoid the segfault by passing owner_address to PinExistingReturnObject(). Signed-off-by: hongchaodeng --- python/ray/_raylet.pyx | 6 +- python/ray/includes/libcoreworker.pxd | 3 +- python/ray/tests/test_actor_failures.py | 95 +++++++++++++++++++++++++ src/ray/core_worker/core_worker.cc | 4 +- src/ray/core_worker/core_worker.h | 4 +- 5 files changed, 105 insertions(+), 7 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e89158484f9c..c827dba0cce1 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -4589,9 +4589,9 @@ cdef class CoreWorker: return True else: with nogil: - success = (CCoreWorkerProcess.GetCoreWorker() - .PinExistingReturnObject( - return_id, return_ptr, generator_id)) + success = ( + CCoreWorkerProcess.GetCoreWorker().PinExistingReturnObject( + return_id, return_ptr, generator_id, caller_address)) return success cdef store_task_outputs(self, diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 1533c3e8dd3b..97946a4f505f 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -166,7 +166,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: c_bool PinExistingReturnObject( const CObjectID& return_id, shared_ptr[CRayObject] *return_object, - const CObjectID& generator_id) + const CObjectID& generator_id, + const CAddress &caller_address) void AsyncDelObjectRefStream(const CObjectID &generator_id) CRayStatus TryReadObjectRefStream( const CObjectID &generator_id, diff --git a/python/ray/tests/test_actor_failures.py b/python/ray/tests/test_actor_failures.py index 975b942d5f9f..bf603b2197ec 100644 --- a/python/ray/tests/test_actor_failures.py +++ b/python/ray/tests/test_actor_failures.py @@ -71,6 +71,101 @@ def create_object(self, size): assert num_success == len(objects) +def test_async_generator_crash_restart(ray_start_cluster): + """ + Timeline: + 1. In worker node, creates a generator that generates 2 objects + 2. Kills worker node, objs exist in ref, but data lost + 3. In worker node, creates a consumer that consumes 2 objects + 4. Start a worker node to enable the task and lineage reconstruction + 5. Lineage reconstruction should be working here. + The gen is dead after it only generated 1. + 6. Verify that the consumer task can still run (it's not) + """ + cluster = ray_start_cluster + cluster.add_node(num_cpus=1, resources={"head": 1}) + cluster.wait_for_nodes() + + ray.init(address=cluster.address) + + @ray.remote(num_cpus=0, resources={"head": 0.1}) + class Killer: + def __init__(self): + self.pid = None + self.at_num = None + self.kill_num = 0 + + def set_pid(self, pid): + self.pid = pid + + def set_at_num(self, at_num): + self.at_num = at_num + + def kill_if_needed(self, num): + if self.kill_num > 3: + return + self.kill_num = self.kill_num + 1 + if self.pid is not None and self.at_num is not None and num == self.at_num: + import os + import signal + + print(f"Killing the pid = {self.pid}") + os.kill(self.pid, signal.SIGKILL) + + @ray.remote( + num_cpus=1, max_restarts=-1, max_task_retries=-1, resources={"worker": 1} + ) + class Generator: + async def gen(self, nums, killer): + """ + Generates "value_holder" objects. For each object, it first notifies the + killer, and yields the object. + """ + print(f"my pid is {os.getpid()}, telling to killer") + await killer.set_pid.remote(os.getpid()) + print(f"generates total {nums}") + for i in range(nums): + await killer.kill_if_needed.remote(i) + + print(f"generating {i}") + yield np.ones((1000, 1000), dtype=np.uint8) * i + print(f"generated {i}") + print(f"generated total {nums}") + + @ray.remote(num_cpus=1, resources={"worker": 1}) + def consumes(objs, expected_num): + nums = ray.get(objs) + assert len(nums) == expected_num + print(f"consumes {len(nums)}") + print(nums) + return expected_num + + worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10}) + cluster.wait_for_nodes() + + generator = Generator.remote() + killer = Killer.remote() + + # First run, no kills + gen = ray.get(generator.gen.remote(2, killer)) # returns ObjectRefGenerator + objs = list(gen) # [ObjectRef, ...] + assert len(objs) == 2 + + # kill the worker node + cluster.remove_node(worker_node, allow_graceful=False) + + # In the lineage reconstruction, the generator is dead after it only generated 5... + ray.get(killer.set_at_num.remote(1)) + + # ... but a consumer takes all 10 + consumer = consumes.remote(objs, 2) + # start a new worker node + worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10}) + cluster.wait_for_nodes() + + ray.get(consumer) + + def test_actor_restart(ray_init_with_task_retry_delay): """Test actor restart when actor process is killed.""" diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 542cbb87a7d5..28d5dd1fa83c 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -3190,13 +3190,13 @@ std::pair CoreWorker::PeekObjectRefStream( bool CoreWorker::PinExistingReturnObject(const ObjectID &return_id, std::shared_ptr *return_object, - const ObjectID &generator_id) { + const ObjectID &generator_id, + const rpc::Address &owner_address) { // TODO(swang): If there is already an existing copy of this object, then it // might not have the same value as the new copy. It would be better to evict // the existing copy here. absl::flat_hash_map> result_map; bool got_exception; - rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress()); // Temporarily set the return object's owner's address. This is needed to retrieve the // value from plasma. diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 822a651bbbd8..3901e78b1e3b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -1167,9 +1167,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// of the object that wraps the dynamically created ObjectRefs in a /// generator. We use this to notify the owner of the dynamically created /// objects. + /// \param[in] caller_address The address of the caller who is also the owner bool PinExistingReturnObject(const ObjectID &return_id, std::shared_ptr *return_object, - const ObjectID &generator_id); + const ObjectID &generator_id, + const rpc::Address &caller_address); /// Dynamically allocate an object. ///