Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][core] Sending ReportWorkerFailure after the process died. (#35320) #35420

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,18 @@ cc_test(
],
)

cc_test(
name = "asio_retry_runner_test",
size = "small",
srcs = ["src/ray/common/test/asio_retry_runner_test.cc"],
copts = COPTS,
tags = ["team:core"],
deps = [
"ray_common",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "ray_config_test",
size = "small",
Expand Down
6 changes: 4 additions & 2 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ def get_actor_info(name: str):
_, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME)
ray.kill(serve.context._global_client._controller, no_restart=False)
# wait for controller is alive again
wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME)
assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1]
wait_for_condition(
lambda: get_actor_info(SERVE_CONTROLLER_NAME) is not None
and get_actor_info(SERVE_CONTROLLER_NAME)[1] != controller1_pid
)

# Let the actor proceed initialization
ray.get(signal.send.remote())
Expand Down
12 changes: 8 additions & 4 deletions python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,16 +1076,18 @@ def not_graceful_exit():
time.sleep(1)
actor.kill_self.remote()
time.sleep(1)
wait_for_condition(
lambda: ray._private.state.actors()[actor_id]["EndTime"] != 0
)
state_after_ending = ray._private.state.actors()[actor_id]

assert state_after_starting["StartTime"] == state_after_ending["StartTime"]

start_time = state_after_ending["StartTime"]
end_time = state_after_ending["EndTime"]
lapsed = end_time - start_time

assert end_time > start_time > 0, f"Start: {start_time}, End: {end_time}"
assert 500 < lapsed < 1500, f"Start: {start_time}, End: {end_time}"
assert 1500 < lapsed < 3500, f"Start: {start_time}, End: {end_time}"

def restarted():
actor = Foo.options(max_restarts=1, max_task_retries=-1).remote()
Expand All @@ -1096,7 +1098,9 @@ def restarted():
actor.kill_self.remote()
time.sleep(1)
actor.kill_self.remote()
time.sleep(1)
wait_for_condition(
lambda: ray._private.state.actors()[actor_id]["EndTime"] != 0
)
state_after_ending = ray._private.state.actors()[actor_id]

assert state_after_starting["StartTime"] == state_after_ending["StartTime"]
Expand All @@ -1106,7 +1110,7 @@ def restarted():
lapsed = end_time - start_time

assert end_time > start_time > 0, f"Start: {start_time}, End: {end_time}"
assert 1500 < lapsed < 2500, f"Start: {start_time}, End: {end_time}"
assert 1500 < lapsed < 4000, f"Start: {start_time}, End: {end_time}"

graceful_exit()
not_graceful_exit()
Expand Down
5 changes: 4 additions & 1 deletion python/ray/tests/test_actor_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,10 @@ def create_actor(self):
ray.kill(owner)
with pytest.raises(
ray.exceptions.RayActorError,
match="The actor is dead because its owner has died",
# TODO(iycheng): re-enable the match after fixing the
# race condition.
# https://github.com/ray-project/ray/pull/34883
# match="The actor is dead because its owner has died",
) as exc_info:
ray.get(a.check_alive.remote())
assert exc_info.value.actor_id == a._actor_id.hex()
Expand Down
5 changes: 4 additions & 1 deletion python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
processes.
"""

ray_start_cluster.add_node()
ray_start_cluster.wait_for_nodes()

output_file_path = tmp_path / "leaked_pids.json"
driver_script = f"""
import ray
Expand All @@ -374,7 +377,7 @@ def test_no_worker_child_process_leaks(ray_start_cluster, tmp_path):
import shutil
import time
import os

ray.init("{ray_start_cluster.address}")
@ray.remote
class Actor:
def create_leaked_child_process(self, num_to_leak):
Expand Down
6 changes: 6 additions & 0 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ def oomer():
timeout=4,
)

wait_for_condition(
lambda: "worker_crash_oom"
in ray_usage_lib.get_extra_usage_tags_to_report(gcs_client),
timeout=4,
)

result = ray_usage_lib.get_extra_usage_tags_to_report(gcs_client)

assert "worker_crash_system_error" in result
Expand Down
70 changes: 69 additions & 1 deletion src/ray/common/asio/asio_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#pragma once

#include <boost/asio.hpp>
#include <boost/asio/coroutine.hpp>
#include <boost/asio/yield.hpp>
#include <chrono>

template <typename Duration>
std::shared_ptr<boost::asio::deadline_timer> execute_after(
instrumented_io_context &io_context,
boost::asio::io_context &io_context,
std::function<void()> fn,
Duration delay_duration) {
auto timer = std::make_shared<boost::asio::deadline_timer>(io_context);
Expand All @@ -35,3 +37,69 @@ std::shared_ptr<boost::asio::deadline_timer> execute_after(

return timer;
}

// This function is used to retry a function until it returns true or the
// retry_num is exhausted. The function will be called immediately and then
// after delay_duration for each retry.
//
// \param e The executor to run the retry loop on. It has to be an io_executor
// from boost::asio
// \param predicate The function to retry. It should accept no arguments and
// return bool.
// \param retry_num The number of times to retry. If it's nullopt, it'll retry
// forever until predicate returns true.
// \param duration The duration to wait between retries. It needs to be
// std::chrono::Duration.
// \param token The completion token to use. It can be either a callback or
// other completion tokens accepted by boost::asio.
//
// \return The completion token return type. For example, if the completion
// token is a callback, it'll return void. If the completion token is
// boost::asio::use_future_t it'll return future<bool>.
template <typename Fn,
typename AsioIOExecutor,
typename Duration,
typename CompletionToken = boost::asio::use_future_t<> >
auto async_retry_until(AsioIOExecutor &&e,
Fn &&predicate,
std::optional<int64_t> retry_num,
Duration delay_duration,
CompletionToken &&token = CompletionToken()) {
static_assert(std::is_assignable_v<std::function<bool()>, Fn>,
"predicate must should accept no arguments and return bool");
auto delay_timer = std::make_unique<boost::asio::deadline_timer>(e);
auto delay = boost::posix_time::microseconds(
std::chrono::duration_cast<std::chrono::microseconds>(delay_duration).count());
return boost::asio::async_compose<CompletionToken, void(bool)>(
[retry_num = retry_num,
delay_timer = std::move(delay_timer),
delay = delay,
coro = boost::asio::coroutine(),
predicate = std::forward<Fn>(predicate)](
auto &self, const boost::system::error_code &error = {}) mutable {
reenter(coro) {
while (true) {
if (error) {
self.complete(false);
return;
}
if (predicate()) {
self.complete(true);
return;
} else {
if (retry_num) {
--*retry_num;
if (*retry_num < 0) {
self.complete(false);
return;
}
}
delay_timer->expires_from_now(delay);
yield delay_timer->async_wait(std::move(self));
}
}
}
},
token,
e);
}
75 changes: 75 additions & 0 deletions src/ray/common/test/asio_retry_runner_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <optional>
#include <string>

#include "gtest/gtest.h"
#include "ray/common/asio/asio_util.h"

using namespace std::chrono_literals;

class RetryRunnerTest : public ::testing::Test {
public:
RetryRunnerTest() {}
void SetUp() override {
t = std::make_unique<std::thread>([this]() {
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard(
io_context.get_executor());
io_context.run();
});
}
void TearDown() override {
io_context.stop();
t->join();
}

boost::asio::io_context io_context;
std::unique_ptr<std::thread> t;
};

TEST_F(RetryRunnerTest, Basic) {
int count = 0;
auto fn = [&count]() mutable {
count++;
return count == 3;
};
// Retry 1 time, wait 5ms between retries.
ASSERT_FALSE(
async_retry_until(io_context.get_executor(), fn, 1, 5ms, boost::asio::use_future)
.get());
ASSERT_EQ(2, count);

count = 0;
// Retry 2 times, wait 5ms between retries.
ASSERT_TRUE(
async_retry_until(io_context.get_executor(), fn, 2, 5ms, boost::asio::use_future)
.get());
ASSERT_EQ(3, count);

count = 0;
// Test default completion token works
ASSERT_TRUE(async_retry_until(io_context.get_executor(), fn, 2, 5ms).get());
ASSERT_EQ(3, count);

count = 0;
// Test default completion token works
ASSERT_TRUE(async_retry_until(io_context.get_executor(), fn, std::nullopt, 5ms).get());
ASSERT_EQ(3, count);
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
25 changes: 21 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "ray/util/sample.h"
#include "ray/util/util.h"

using namespace std::chrono_literals;

namespace {

#define RAY_CHECK_ENUM(x, y) \
Expand Down Expand Up @@ -1487,9 +1489,6 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
disconnect_detail,
worker->GetProcess().GetId(),
creation_task_exception);
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));

if (is_worker) {
const ActorID &actor_id = worker->GetActorId();
const TaskID &task_id = worker->GetAssignedTaskId();
Expand Down Expand Up @@ -1566,7 +1565,25 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());

client->Close();

auto proc = worker->GetProcess();
if (is_driver) {
// Check driver's liveness is not possible since driver
// can be a zombie. Report the failure immediately.
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
} else {
// Otherwise, do the checking
async_retry_until(
io_service_.get_executor(),
[proc]() { return proc.IsAlive() == false; },
std::nullopt,
100ms,
[this, worker_failure_data_ptr](bool ret) {
RAY_CHECK(ret);
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(
worker_failure_data_ptr, nullptr));
});
}
// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down
10 changes: 10 additions & 0 deletions src/ray/util/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,19 @@ bool IsProcessAlive(pid_t pid) {
}
return false;
#else
// Check pid is alive or not
if (kill(pid, 0) == -1 && errno == ESRCH) {
return false;
}
// If it's a child, check whether it's zombie
int status = 0;
int ret = waitpid(pid, &status, WNOHANG);
if (ret == pid) {
if (WIFEXITED(status)) {
RAY_LOG(DEBUG) << "Child exited with status: " << WEXITSTATUS(status);
}
return false;
}
return true;
#endif
}
Expand Down