Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fiber vs thread_local SIGSEGV in Actor async methods #46489

Closed
rynewang opened this issue Jul 8, 2024 · 3 comments · Fixed by #46973
Closed

[core] Fiber vs thread_local SIGSEGV in Actor async methods #46489

rynewang opened this issue Jul 8, 2024 · 3 comments · Fixed by #46973
Assignees
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core

Comments

@rynewang
Copy link
Contributor

rynewang commented Jul 8, 2024

What happened + What you expected to happen

We have this thread_local thing here:

static thread_local std::unique_ptr<WorkerThreadContext> thread_context_;

and it's used to determine "CurrentTask" in Core Worker. Sadly we have cases where the SetCurrentTask, called in ExecuteTask, is called in a fiber thread. So, when the python thread reads via GetCurrentTask, it segfaults.

It's a mystery why it only shows up in a actor restart. In theory it should show up right away easily.

We may be able to use boost::fiber_specific_ptr to solve this. But more fundamentally it's dubious to me why we need such a global thread local method to get a task spec in the first place.

Versions / Dependencies

master

Reproduction script

def test_died_generator(ray_start_cluster):
    """
    Tests nondeterministic generators vs lineage reconstruction.
    Timeline:

    1. In worker node, creates a generator that generates 100 objects
    2. Kills worker node, objs exist in ref, but data lost
    3. In worker node, creates a consumer that consumes 100 objects
    4. Start a worker node to enable the task and lineage reconstruction
    5. Lineage reconstruction should be working here. The gen is dead after it only generated 50.
    5. Verify that the consumer task can still run (it's not)
    """
    cluster = ray_start_cluster
    cluster.add_node(num_cpus=1, resources={"head": 1})
    cluster.wait_for_nodes()

    ray.init(address=cluster.address)

    @ray.remote(num_cpus=0, resources={"head": 0.1})
    class Killer:
        def __init__(self):
            self.pid = None
            self.at_num = None

        def set_pid(self, pid):
            self.pid = pid
        
        def set_at_num(self, at_num):
            self.at_num = at_num

        def kill_if_needed(self, num):
            if self.pid is not None and self.at_num is not None and num == self.at_num:
                import os
                import signal
                print(f"Killing the pid = {self.pid}")
                os.kill(self.pid, signal.SIGKILL)

    @ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1, resources={"worker": 1})
    class Generator:
        async def gen(self, nums, killer):
            """
            Generates "value_holder" objects. For each object, it first notifies the
            killer, and yields the object.
            """
            print(f'my pid is {os.getpid()}, telling to killer')
            await killer.set_pid.remote(os.getpid())
            print(f"generates total {nums}")
            for i in range(nums):
                print(f"generating {i}")
                await killer.kill_if_needed.remote(i)
                #### !!!!!!!!!!!!!!!!!!!!! SIGSEGV in second run
                #### Crash loops in all retrys, everytime in the first yield
                #### worker_context_.GetCurrentTask().get() is nullptr
                #### For some reason, SetCurrentTask (CoreWorker::ExecuteTask) is not on the same thread as GetCurrentTask (CoreWorker::PinExistingReturnObject)
                # CoreWorker::ExecuteTask tid 0x172083000 (no stack trace?? live on a fiber???)
                # GetCurrentTask tid 0x171ff7000 got nullptr but NOT crashed in ProfileEvent ctor, it checks if the task is nullptr
                #       (uvloop event loop, ize_eventloops_for_actor_concurrency_group > submit_actor_task > ProfileEvent)
                # Crash: CoreWorker::GetCurrentTask tid 0x171ff7000
                # StreamingGeneratorExecutionContext report_streaming_generator_output
                # (Generator pid=13180)     @        0x107aa3c10  (unknown)  ray::core::CoreWorker::PinExistingReturnObject()
                # (Generator pid=13180)     @        0x107956b34  (unknown)  __pyx_f_3ray_7_raylet_10CoreWorker_store_task_output()
                # (Generator pid=13180)     @        0x1079584f0  (unknown)  __pyx_f_3ray_7_raylet_10CoreWorker_store_task_outputs()
                # (Generator pid=13180)     @        0x1079da8dc  (unknown)  __pyx_f_3ray_7_raylet_report_streaming_generator_output()
                yield np.ones((1000, 1000), dtype=np.uint8) * i
                print(f"generated {i}")
            print(f"generated total {nums}")

        def get_pid(self):
            return os.getpid()

    @ray.remote(num_cpus=1, resources={"worker": 1})
    def consumes(objs, expected_num):
        nums = ray.get(objs)
        assert len(nums) == expected_num
        print(f"consumes {len(nums)}")
        print(nums)
        return expected_num
    
    worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10})
    cluster.wait_for_nodes()

    generator = Generator.remote()
    killer = Killer.remote()
    
    # First run, no kills
    gen = ray.get(generator.gen.remote(100, killer))
    objs = list(gen)
    assert len(objs) == 100

    # kill the worker node
    cluster.remove_node(worker_node, allow_graceful=False)

    # In the lineage reconstruction, the generator is dead after it only generated 50...
    ray.get(killer.set_at_num.remote(50))
    # ... but a consumer takes all 100
    consumer = consumes.remote(objs, 100)
    # start a new worker node
    worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10})
    cluster.wait_for_nodes()

    ray.get(consumer)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

@rynewang rynewang added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) core Issues that should be addressed in Ray Core P1 Issue that should be fixed within a few weeks p0.5 and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) P1 Issue that should be fixed within a few weeks labels Jul 8, 2024
@hongchaodeng
Copy link
Member

hongchaodeng commented Jul 31, 2024

TLDR: If the async actor crash-restarts, there is a chance that it will call PinExistingReturnObject() which doesn't support async task. This is a race. For short term, we should pass owner_address from raylet.pyx. But in long term, we should fix GetThreadContext() and support async tasks too.


I have successfully reproduced the issue in repeated way:

  • It happens only by killing the async actor 16 times. That means killing it 15 times will still work. That might explain why it didn't show right up before.
  • The root cause is the thread_local ThreadContext. Since it is only used for storing parameters, we should just refactor the code instead.
import os
import ray

import numpy as np
import pytest

def test_died_generator(ray_start_cluster):
    """
    Tests nondeterministic generators vs lineage reconstruction.
    Timeline:

    1. In worker node, creates a generator that generates 100 objects
    2. Kills worker node, objs exist in ref, but data lost
    3. In worker node, creates a consumer that consumes 100 objects
    4. Start a worker node to enable the task and lineage reconstruction
    5. Lineage reconstruction should be working here. The gen is dead after it only generated 50.
    6. Verify that the consumer task can still run (it's not)
    """
    cluster = ray_start_cluster
    cluster.add_node(num_cpus=1, resources={"head": 1})
    cluster.wait_for_nodes()

    ray.init(address=cluster.address)

    @ray.remote(num_cpus=0, resources={"head": 0.1})
    class Killer:
        def __init__(self):
            self.pid = None
            self.at_num = None
            self.kill_num = 0

        def set_pid(self, pid):
            self.pid = pid
        
        def set_at_num(self, at_num):
            self.at_num = at_num

        def kill_if_needed(self, num):
            if self.kill_num > 15:
            # Uncomment the following and comment above will make it pass
            # if self.kill_num > 14:
                return
            self.kill_num = self.kill_num + 1
            if self.pid is not None and self.at_num is not None and num == self.at_num:
                import os
                import signal
                print(f"Killing the pid = {self.pid}")
                os.kill(self.pid, signal.SIGKILL)

    @ray.remote(num_cpus=1, max_restarts=-1, max_task_retries=-1, resources={"worker": 1})
    class Generator:
        async def gen(self, nums, killer):
            """
            Generates "value_holder" objects. For each object, it first notifies the
            killer, and yields the object.
            """
            print(f'my pid is {os.getpid()}, telling to killer')
            await killer.set_pid.remote(os.getpid())
            print(f"generates total {nums}")
            for i in range(nums):
                print(f"generating {i}")
                await killer.kill_if_needed.remote(i)

                yield np.ones((1000, 1000), dtype=np.uint8) * i
                print(f"generated {i}")
            print(f"generated total {nums}")

        def get_pid(self):
            return os.getpid()

    @ray.remote(num_cpus=1, resources={"worker": 1})
    def consumes(objs, expected_num):
        nums = ray.get(objs)
        assert len(nums) == expected_num
        print(f"consumes {len(nums)}")
        print(nums)
        return expected_num
    
    worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10})
    cluster.wait_for_nodes()

    generator = Generator.remote()
    killer = Killer.remote()

    # First run, no kills
    # gen -> ObjectRefGenerator
    gen = ray.get(generator.gen.remote(10, killer))
    # [ObjectRef, ...]
    objs = list(gen)
    assert len(objs) == 10

    # kill the worker node
    cluster.remove_node(worker_node, allow_graceful=False)

    # In the lineage reconstruction, the generator is dead after it only generated 5...
    ray.get(killer.set_at_num.remote(5))
    # ... but a consumer takes all 10
    consumer = consumes.remote(objs, 10)
    # start a new worker node
    worker_node = cluster.add_node(num_cpus=10, resources={"worker": 10})
    cluster.wait_for_nodes()

    ray.get(consumer)

@rynewang
Copy link
Contributor Author

hmm in my test I don't think we need to kill 15 times; only 1 times suffice. what's the diff of your test code?

@hongchaodeng
Copy link
Member

hongchaodeng commented Jul 31, 2024

@rynewang

hmm in my test I don't think we need to kill 15 times; only 1 times suffice.

That's not true. In your test the async actor was killed indefinitely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't core Issues that should be addressed in Ray Core
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants