Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add more accurate worker exit #24468

Merged
merged 21 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
return std::make_pair(ray::Status::OK(),
std::make_shared<msgpack::sbuffer>(std::move(result)));
} catch (RayIntentionalSystemExitException &e) {
return std::make_pair(ray::Status::IntentionalSystemExit(), nullptr);
return std::make_pair(ray::Status::IntentionalSystemExit(""), nullptr);
} catch (RayException &e) {
return std::make_pair(ray::Status::NotFound(e.what()), nullptr);
} catch (msgpack::type_error &e) {
Expand Down Expand Up @@ -251,7 +251,7 @@ Status TaskExecutor::ExecuteTask(
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
} else {
if (!status.ok()) {
return ray::Status::CreationTaskError();
return ray::Status::CreationTaskError("");
}
}
return ray::Status::OK();
Expand Down
21 changes: 16 additions & 5 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ cdef execute_task(
if core_worker.current_actor_is_asyncio():
error = SystemExit(0)
error.is_ray_terminate = True
error.ray_terminate_msg = "exit_actor() is called."
raise error

function_descriptor = CFunctionDescriptorToPython(
Expand Down Expand Up @@ -770,6 +771,9 @@ cdef execute_task(
if task_counter == execution_info.max_calls:
exit = SystemExit(0)
exit.is_ray_terminate = True
exit.ray_terminate_msg = (
"max_call has reached, "
f"max_calls: {execution_info.max_calls}")
raise exit

cdef shared_ptr[LocalMemoryBuffer] ray_error_to_memory_buf(ray_error):
Expand Down Expand Up @@ -815,6 +819,9 @@ cdef CRayStatus task_execution_handler(
(&creation_task_exception_pb_bytes)[0] = (
ray_error_to_memory_buf(e))
sys_exit.is_creation_task_error = True
sys_exit.init_error_message = (
"Exception raised from an actor init method. "
f"Traceback: {str(e)}")
else:
traceback_str = traceback.format_exc() + (
"An unexpected internal error "
Expand All @@ -825,28 +832,32 @@ cdef CRayStatus task_execution_handler(
"worker_crash",
traceback_str,
job_id=None)
sys_exit.unexpected_error_traceback = traceback_str
raise sys_exit
except SystemExit as e:
# Tell the core worker to exit as soon as the result objects
# are processed.
if hasattr(e, "is_ray_terminate"):
return CRayStatus.IntentionalSystemExit()
return CRayStatus.IntentionalSystemExit(e.ray_terminate_msg)
elif hasattr(e, "is_creation_task_error"):
return CRayStatus.CreationTaskError()
return CRayStatus.CreationTaskError(e.init_error_message)
elif e.code and e.code == 0:
# This means the system exit was
# normal based on the python convention.
# https://docs.python.org/3/library/sys.html#sys.exit
return CRayStatus.IntentionalSystemExit()
return CRayStatus.IntentionalSystemExit(
f"Worker exits with an exit code {e.code}.")
else:
msg = "SystemExit was raised from the worker."
msg = f"Worker exits with an exit code {e.code}."
# In K8s, SIGTERM likely means we hit memory limits, so print
# a more informative message there.
if "KUBERNETES_SERVICE_HOST" in os.environ:
msg += (
" The worker may have exceeded K8s pod memory limits.")
if hasattr(e, "unexpected_error_traceback"):
msg += (f"\n {e.unexpected_error_traceback}")
logger.exception(msg)
return CRayStatus.UnexpectedSystemExit()
return CRayStatus.UnexpectedSystemExit(msg)

return CRayStatus.OK()

Expand Down
1 change: 1 addition & 0 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,7 @@ def exit_actor():
# reduces log verbosity.
exit = SystemExit(0)
exit.is_ray_terminate = True
exit.ray_terminate_msg = "exit_actor() is called."
raise exit
assert False, "This process should have terminated."
else:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class WorkerState:
worker_id: str
is_alive: str
worker_type: str
exit_type: str
exit_detail: str
pid: str


@dataclass(init=True)
Expand Down
6 changes: 3 additions & 3 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ cdef extern from "ray/common/status.h" namespace "ray" nogil:
CRayStatus Interrupted(const c_string &msg)

@staticmethod
CRayStatus IntentionalSystemExit()
CRayStatus IntentionalSystemExit(const c_string &msg)

@staticmethod
CRayStatus UnexpectedSystemExit()
CRayStatus UnexpectedSystemExit(const c_string &msg)

@staticmethod
CRayStatus CreationTaskError()
CRayStatus CreationTaskError(const c_string &msg)

@staticmethod
CRayStatus NotFound()
Expand Down
92 changes: 91 additions & 1 deletion python/ray/tests/test_failure_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
import pytest
import time

from ray._private.test_utils import SignalActor, wait_for_pid_to_exit
from ray.experimental.state.api import list_workers
from ray._private.test_utils import (
SignalActor,
wait_for_pid_to_exit,
wait_for_condition,
)

SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM

Expand Down Expand Up @@ -138,6 +143,91 @@ async def get(self, x, wait=False):
assert ray.get(ref_3) == 3


def test_worker_failure_information(ray_start_cluster):
"""
UNEXPECTED_SYSTEM_ERROR_EXIT
- (tested) Failure from the connection E.g., core worker dead.
- (tested) Unexpected exception or exit with exit_code !=0 on core worker.
- (tested for owner node death) Node died. Currently worker failure detection
upon node death is not detected by Ray. TODO(sang): Fix it.
- (Cannot test) Direct call failure.

INTENDED_USER_EXIT
- (tested) Shutdown driver
- (tested) exit_actor
- (tested) exit(0)
- (tested) Actor kill request
- (tested) Task cancel request

INTENDED_SYSTEM_EXIT
- (not tested, hard to test) Unused resource removed
- (tested) Pg removed
- (tested) Idle
ACTOR_INIT_FAILURE_EXIT
- (tested) Actor init failed

"""
cluster = ray_start_cluster
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
cluster.add_node(num_cpus=1, resources={"worker": 1})

@ray.remote
class Actor:
def pid(self):
import os

return os.getpid()

def exit(self, exit_code):
sys.exit(exit_code)

def get_worker_by_pid(pid):
for w in list_workers().values():
if w["pid"] == pid:
return w
assert False

"""
Failure from the connection
"""
a = Actor.remote()
pid = ray.get(a.pid.remote())
print(pid)
os.kill(pid, signal.SIGKILL)

def verify_connection_failure():
worker = get_worker_by_pid(pid)
print(worker)
type = worker["exit_type"]
detail = worker["exit_detail"]
# If the worker is killed by SIGKILL, it is highly likely by OOM, so
# the error message should contain information.
return type == "UNEXPECTED_SYSTEM_EXIT" and "OOM" in detail

# print(list_workers())
# ray.get(a.pid.remote())
wait_for_condition(verify_connection_failure)

"""
Unexpected exception or exit with exit_code !=0 on core worker.
"""
a = Actor.remote()
pid = ray.get(a.pid.remote())
with pytest.raises(ray.exceptions.RayActorError):
ray.get(a.exit.remote(4))

def verify_exit_failure():
worker = get_worker_by_pid(pid)
type = worker["exit_type"]
detail = worker["exit_detail"]
# If the worker is killed by SIGKILL, it is highly likely by OOM, so
# the error message should contain information.
return type == "UNEXPECTED_SYSTEM_EXIT" and "exit code 4" in detail

wait_for_condition(verify_exit_failure)


if __name__ == "__main__":
import pytest

Expand Down
33 changes: 22 additions & 11 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -360,14 +360,12 @@ std::shared_ptr<ClientConnection> ClientConnection::Create(
local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type,
const std::vector<uint8_t> &error_message_data) {
int64_t error_message_type) {
std::shared_ptr<ClientConnection> self(new ClientConnection(message_handler,
std::move(socket),
debug_label,
message_type_enum_names,
error_message_type,
error_message_data));
error_message_type));
// Let our manager process our new connection.
client_handler(*self);
return self;
Expand All @@ -378,15 +376,13 @@ ClientConnection::ClientConnection(
local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type,
const std::vector<uint8_t> &error_message_data)
int64_t error_message_type)
: ServerConnection(std::move(socket)),
registered_(false),
message_handler_(message_handler),
debug_label_(debug_label),
message_type_enum_names_(message_type_enum_names),
error_message_type_(error_message_type),
error_message_data_(error_message_data) {}
error_message_type_(error_message_type) {}

void ClientConnection::Register() {
RAY_CHECK(!registered_);
Expand Down Expand Up @@ -427,9 +423,6 @@ void ClientConnection::ProcessMessages() {

void ClientConnection::ProcessMessageHeader(const boost::system::error_code &error) {
if (error) {
// If there was an error, disconnect the client.
read_type_ = error_message_type_;
read_message_ = error_message_data_;
read_length_ = 0;
ProcessMessage(error);
return;
Expand Down Expand Up @@ -502,7 +495,25 @@ std::string ClientConnection::RemoteEndpointInfo() {

void ClientConnection::ProcessMessage(const boost::system::error_code &error) {
if (error) {
flatbuffers::FlatBufferBuilder fbb;
const auto &disconnect_detail = fbb.CreateString(absl::StrCat(
"Worker unexpectedly exits with a connection error code ",
error.value(),
". ",
error.message(),
". There are some potential root causes. (1) The process is killed by "
"SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is "
"called. (3) The worker is crashed unexpectedly due to SIGSEGV or other "
"unexpected errors."));
protocol::DisconnectClientBuilder builder(fbb);
builder.add_disconnect_type(
static_cast<int>(ray::rpc::WorkerExitType::UNEXPECTED_SYSTEM_EXIT));
builder.add_disconnect_detail(disconnect_detail);
fbb.Finish(builder.Finish());
std::vector<uint8_t> error_data(fbb.GetBufferPointer(),
fbb.GetBufferPointer() + fbb.GetSize());
read_type_ = error_message_type_;
read_message_ = error_data;
}

int64_t start_ms = current_time_ms();
Expand Down
21 changes: 8 additions & 13 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
#include <memory>

#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/common_protocol.h"
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/raylet/format/node_manager_generated.h"

namespace ray {

Expand Down Expand Up @@ -175,7 +177,6 @@ class ClientConnection;
using ClientHandler = std::function<void(ClientConnection &)>;
using MessageHandler = std::function<void(
std::shared_ptr<ClientConnection>, int64_t, const std::vector<uint8_t> &)>;
static std::vector<uint8_t> _dummy_error_message_data;

/// \typename ClientConnection
///
Expand All @@ -196,16 +197,14 @@ class ClientConnection : public ServerConnection {
/// \param message_type_enum_names A table of printable enum names for the
/// message types received from this client, used for debug messages.
/// \param error_message_type the type of error message
/// \param error_message_data the companion data to the error message type.
/// \return std::shared_ptr<ClientConnection>.
static std::shared_ptr<ClientConnection> Create(
ClientHandler &new_client_handler,
MessageHandler &message_handler,
local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type,
const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);
int64_t error_message_type);

std::shared_ptr<ClientConnection> shared_ClientConnection_from_this() {
return std::static_pointer_cast<ClientConnection>(shared_from_this());
Expand All @@ -221,13 +220,11 @@ class ClientConnection : public ServerConnection {

protected:
/// A protected constructor for a node client connection.
ClientConnection(
MessageHandler &message_handler,
local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type,
const std::vector<uint8_t> &error_message_data = _dummy_error_message_data);
ClientConnection(MessageHandler &message_handler,
local_stream_socket &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type);
/// Process an error from the last operation, then process the message
/// header from the client.
void ProcessMessageHeader(const boost::system::error_code &error);
Expand Down Expand Up @@ -257,8 +254,6 @@ class ClientConnection : public ServerConnection {
const std::vector<std::string> message_type_enum_names_;
/// The value for disconnect client message.
int64_t error_message_type_;
/// The data for disconnect client message.
std::vector<uint8_t> error_message_data_;
/// Buffers for the current message being read from the client.
int64_t read_cookie_;
int64_t read_type_;
Expand Down
13 changes: 6 additions & 7 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,16 @@ class RAY_EXPORT Status {
return Status(StatusCode::Interrupted, msg);
}

static Status IntentionalSystemExit() {
return Status(StatusCode::IntentionalSystemExit, "intentional system exit");
static Status IntentionalSystemExit(const std::string &msg) {
return Status(StatusCode::IntentionalSystemExit, msg);
}

static Status UnexpectedSystemExit() {
return Status(StatusCode::UnexpectedSystemExit, "user code caused exit");
static Status UnexpectedSystemExit(const std::string &msg) {
return Status(StatusCode::UnexpectedSystemExit, msg);
}

static Status CreationTaskError() {
return Status(StatusCode::CreationTaskError,
"error raised in creation task, cause worker to exit");
static Status CreationTaskError(const std::string &msg) {
return Status(StatusCode::CreationTaskError, msg);
}

static Status NotFound(const std::string &msg) {
Expand Down
Loading