Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fix exit handling of FiberState threads #45834

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/ray/core_worker/fiber.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ class FiberState {
std::this_thread::sleep_for(std::chrono::hours(1));
}
});

fiber_runner_thread_.detach();
hongchaodeng marked this conversation as resolved.
Show resolved Hide resolved
}

void EnqueueFiber(std::function<void()> &&callback) {
Expand All @@ -155,7 +157,6 @@ class FiberState {

void Join() {
fiber_stopped_event_->Wait();
fiber_runner_thread_.detach();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we are relying on the behavior that channel_.close(); and fiber_stopped_event_->Wait(); can be called multiple times, which I checked is true. But this seems fragile and rely on the underlying behavior of these libraries. Can we just have our own stopped_ and joined_ flags and early return if they are already called? Thoughts @hongchaodeng @rynewang

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's merge this and fix the issue first.
We are basically guaranteeing something that the library do not provide -- thread cancellation.
That's orthogonal to enhancing the capabilities.

}

private:
Expand Down
8 changes: 8 additions & 0 deletions src/ray/core_worker/test/fiber_state_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ TEST(FiberStateTest, RespectsConcurrencyLimit) {
fiber_state.Join();
}

TEST(FiberStateTest, DoubleStopJoin) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the unit test, can we also add an e2e test using the repro script mentioned in the GH issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is worth it.
The original repro script is sort of red herring. This test covers the root of the problem.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this unit test covers the root cause of this issue, but it's still nice to make sure user's workload work well e2e even if, for example, we remove fiber completely in the future(and this unit test will be irrelevant)

FiberState fiber_state(2);
fiber_state.Stop();
fiber_state.Join();
fiber_state.Stop();
fiber_state.Join();
}

} // namespace core
} // namespace ray

Expand Down
Loading