Skip to content

Commit

Permalink
[Core] Remove ray._raylet.check_health (#47526)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Sep 9, 2024
1 parent 3e8dd0d commit 1dd8d60
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 157 deletions.
2 changes: 0 additions & 2 deletions python/ray/_private/internal_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import ray._private.services as services
import ray._private.utils as utils
import ray._private.worker
from ray._private import ray_constants
from ray._private.state import GlobalState
from ray._raylet import GcsClientOptions
from ray.core.generated import common_pb2
Expand Down Expand Up @@ -34,7 +33,6 @@ def get_state_from_address(address=None):

def memory_summary(
address=None,
redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
group_by="NODE_ADDRESS",
sort_by="OBJECT_SIZE",
units="B",
Expand Down
42 changes: 0 additions & 42 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ from ray.includes.common cimport (
kResourceUnitScaling,
kImplicitResourcePrefix,
kWorkerSetupHookKeyName,
PythonCheckGcsHealth,
PythonGetNodeLabels,
PythonGetResourcesTotal,
)
Expand Down Expand Up @@ -3263,47 +3262,6 @@ cdef class _TestOnly_GcsActorSubscriber(_GcsSubscriber):
return [(key_id, info)]


def check_health(address: str, timeout=2, skip_version_check=False):
"""Checks Ray cluster health, before / without actually connecting to the
cluster via ray.init().
Args:
address: Ray cluster / GCS address string, e.g. ip:port.
timeout: request timeout.
skip_version_check: If True, will skip comparision of GCS Ray version with local
Ray version. If False (default), will raise exception on mismatch.
Returns:
Returns True if the cluster is running and has matching Ray version.
Returns False if no service is running.
Raises an exception otherwise.
"""

tokens = address.rsplit(":", 1)
if len(tokens) != 2:
raise ValueError("Invalid address: {}. Expect 'ip:port'".format(address))
gcs_address, gcs_port = tokens

cdef:
c_string c_gcs_address = gcs_address
int c_gcs_port = int(gcs_port)
int64_t timeout_ms = round(1000 * timeout) if timeout else -1
c_string c_ray_version = ray.__version__
c_bool c_skip_version_check = skip_version_check
c_bool c_is_healthy = True

try:
with nogil:
check_status(PythonCheckGcsHealth(
c_gcs_address, c_gcs_port, timeout_ms, c_ray_version,
c_skip_version_check, c_is_healthy))
except RpcError:
traceback.print_exc()
except RaySystemError as e:
raise RuntimeError(str(e))

return c_is_healthy


cdef class CoreWorker:

def __cinit__(self, worker_type, store_socket, raylet_socket,
Expand Down
5 changes: 0 additions & 5 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -698,11 +698,6 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil:
unordered_map[c_string, c_string] PythonGetNodeLabels(
const CGcsNodeInfo& node_info)

CRayStatus PythonCheckGcsHealth(
const c_string& gcs_address, const int gcs_port, const int64_t timeout_ms,
const c_string& ray_version, const c_bool skip_version_check,
c_bool& is_healthy)

cdef extern from "src/ray/protobuf/gcs.pb.h" nogil:
cdef enum CChannelType "ray::rpc::ChannelType":
RAY_ERROR_INFO_CHANNEL "ray::rpc::ChannelType::RAY_ERROR_INFO_CHANNEL",
Expand Down
49 changes: 24 additions & 25 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import subprocess
import sys
import time
import traceback
import urllib
import urllib.parse
import warnings
Expand Down Expand Up @@ -57,6 +56,18 @@
logger = logging.getLogger(__name__)


def _check_ray_version(gcs_client):
import ray._private.usage.usage_lib as ray_usage_lib

cluster_metadata = ray_usage_lib.get_cluster_metadata(gcs_client)
if cluster_metadata and cluster_metadata["ray_version"] != ray.__version__:
raise RuntimeError(
"Ray version mismatch: cluster has Ray version "
f'{cluster_metadata["ray_version"]} '
f"but local Ray version is {ray.__version__}"
)


@click.group()
@click.option(
"--logging-level",
Expand Down Expand Up @@ -1953,13 +1964,12 @@ def memory(
):
"""Print object references held in a Ray cluster."""
address = services.canonicalize_bootstrap_address_or_die(address)
if not ray._raylet.check_health(address):
raise click.BadParameter(f"Ray cluster is not found at {address}")
gcs_client = ray._raylet.GcsClient(address=address)
_check_ray_version(gcs_client)
time = datetime.now()
header = "=" * 8 + f" Object references status: {time} " + "=" * 8
mem_stats = memory_summary(
address,
redis_password,
group_by,
sort_by,
units,
Expand Down Expand Up @@ -1993,16 +2003,11 @@ def memory(
def status(address: str, redis_password: str, verbose: bool):
"""Print cluster status, including autoscaling info."""
address = services.canonicalize_bootstrap_address_or_die(address)
if not ray._raylet.check_health(address):
raise click.BadParameter(f"Ray cluster is not found at {address}")
gcs_client = ray._raylet.GcsClient(address=address)
_check_ray_version(gcs_client)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
status = ray.experimental.internal_kv._internal_kv_get(
ray_constants.DEBUG_AUTOSCALING_STATUS
)
error = ray.experimental.internal_kv._internal_kv_get(
ray_constants.DEBUG_AUTOSCALING_ERROR
)
status = gcs_client.internal_kv_get(ray_constants.DEBUG_AUTOSCALING_STATUS.encode())
error = gcs_client.internal_kv_get(ray_constants.DEBUG_AUTOSCALING_ERROR.encode())
print(debug_status(status, error, verbose=verbose, address=address))


Expand Down Expand Up @@ -2292,10 +2297,9 @@ def drain_node(
raise click.BadParameter(f"Invalid hex ID of a Ray node, got {node_id}")

address = services.canonicalize_bootstrap_address_or_die(address)
if not ray._raylet.check_health(address):
raise click.BadParameter(f"Ray cluster is not found at {address}")

gcs_client = ray._raylet.GcsClient(address=address)
_check_ray_version(gcs_client)
is_accepted, rejection_error_message = gcs_client.drain_node(
node_id,
autoscaler_pb2.DrainNodeReason.Value(reason),
Expand Down Expand Up @@ -2370,20 +2374,15 @@ def healthcheck(address, redis_password, component, skip_version_check):
"""

address = services.canonicalize_bootstrap_address_or_die(address)
gcs_client = ray._raylet.GcsClient(address=address)
if not skip_version_check:
_check_ray_version(gcs_client)

if not component:
try:
if ray._raylet.check_health(address, skip_version_check=skip_version_check):
sys.exit(0)
except Exception:
traceback.print_exc()
pass
sys.exit(1)
sys.exit(0)

gcs_client = ray._raylet.GcsClient(address=address)
ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
report_str = ray.experimental.internal_kv._internal_kv_get(
component, namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK
report_str = gcs_client.internal_kv_get(
component.encode(), namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK
)
if not report_str:
# Status was never updated
Expand Down
24 changes: 0 additions & 24 deletions python/ray/tests/test_advanced_4.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from unittest import mock
import subprocess
import sys

import pytest

import ray
from ray._raylet import check_health
from ray._private.test_utils import (
Semaphore,
client_test_enabled,
Expand Down Expand Up @@ -114,28 +112,6 @@ def test_jemalloc_env_var_propagate():
assert actual == expected


def test_check_health(shutdown_only):
assert not check_health("127.0.0.1:8888")
# Should not raise error: https://github.com/ray-project/ray/issues/38785
assert not check_health("ip:address:with:colon:name:8265")

with pytest.raises(ValueError):
check_health("bad_address_no_port")

conn = ray.init()
addr = conn.address_info["address"]
assert check_health(addr)


def test_check_health_version_check(shutdown_only):
with mock.patch("ray.__version__", "FOO-VERSION"):
conn = ray.init()
addr = conn.address_info["address"]
assert check_health(addr, skip_version_check=True)
with pytest.raises(RuntimeError):
check_health(addr)


def test_back_pressure(shutdown_only_with_initialization_check):
ray.init()

Expand Down
57 changes: 44 additions & 13 deletions python/ray/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,10 @@ def test_ray_check_open_ports(shutdown_only, start_open_port_check_server):
assert "[🛑] open ports detected" in result.output


def test_ray_drain_node():
def test_ray_drain_node(monkeypatch):
monkeypatch.setenv("RAY_py_gcs_connect_timeout_s", "1")
ray._raylet.Config.initialize("")

runner = CliRunner()
result = runner.invoke(
scripts.drain_node,
Expand Down Expand Up @@ -1105,7 +1108,9 @@ def test_ray_drain_node():
],
)
assert result.exit_code != 0
assert "Ray cluster is not found at 127.0.0.2:8888" in result.output
assert "Timed out while waiting for GCS to become available" in str(
result.exception
)

result = runner.invoke(
scripts.drain_node,
Expand All @@ -1121,10 +1126,32 @@ def test_ray_drain_node():
assert result.exit_code != 0
assert "Invalid hex ID of a Ray node, got invalid-node-id" in result.output

with patch("ray._raylet.check_health", return_value=True), patch(
"ray._raylet.GcsClient"
) as MockGcsClient:
with patch("ray._raylet.GcsClient") as MockGcsClient:
mock_gcs_client = MockGcsClient.return_value
mock_gcs_client.internal_kv_get.return_value = (
'{"ray_version": "ray_version_mismatch"}'.encode()
)
result = runner.invoke(
scripts.drain_node,
[
"--address",
"127.0.0.1:6543",
"--node-id",
"0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf",
"--reason",
"DRAIN_NODE_REASON_IDLE_TERMINATION",
"--reason-message",
"idle termination",
],
)
assert result.exit_code != 0
assert "Ray version mismatch" in str(result.exception)

with patch("ray._raylet.GcsClient") as MockGcsClient:
mock_gcs_client = MockGcsClient.return_value
mock_gcs_client.internal_kv_get.return_value = (
f'{{"ray_version": "{ray.__version__}"}}'.encode()
)
mock_gcs_client.drain_node.return_value = (True, "")
result = runner.invoke(
scripts.drain_node,
Expand All @@ -1140,17 +1167,18 @@ def test_ray_drain_node():
],
)
assert result.exit_code == 0
assert mock_gcs_client.mock_calls[0] == mock.call.drain_node(
assert mock_gcs_client.mock_calls[1] == mock.call.drain_node(
"0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf",
1,
"idle termination",
0,
)

with patch("ray._raylet.check_health", return_value=True), patch(
"ray._raylet.GcsClient"
) as MockGcsClient:
with patch("ray._raylet.GcsClient") as MockGcsClient:
mock_gcs_client = MockGcsClient.return_value
mock_gcs_client.internal_kv_get.return_value = (
f'{{"ray_version": "{ray.__version__}"}}'.encode()
)
mock_gcs_client.drain_node.return_value = (False, "Node not idle")
result = runner.invoke(
scripts.drain_node,
Expand All @@ -1168,10 +1196,13 @@ def test_ray_drain_node():
assert result.exit_code != 0
assert "The drain request is not accepted: Node not idle" in result.output

with patch("ray._raylet.check_health", return_value=True), patch(
"time.time_ns", return_value=1000000000
), patch("ray._raylet.GcsClient") as MockGcsClient:
with patch("time.time_ns", return_value=1000000000), patch(
"ray._raylet.GcsClient"
) as MockGcsClient:
mock_gcs_client = MockGcsClient.return_value
mock_gcs_client.internal_kv_get.return_value = (
f'{{"ray_version": "{ray.__version__}"}}'.encode()
)
mock_gcs_client.drain_node.return_value = (True, "")
result = runner.invoke(
scripts.drain_node,
Expand All @@ -1189,7 +1220,7 @@ def test_ray_drain_node():
],
)
assert result.exit_code == 0
assert mock_gcs_client.mock_calls[0] == mock.call.drain_node(
assert mock_gcs_client.mock_calls[1] == mock.call.drain_node(
"0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf",
2,
"spot preemption",
Expand Down
39 changes: 0 additions & 39 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,45 +708,6 @@ std::unordered_map<std::string, std::string> PythonGetNodeLabels(
node_info.labels().end());
}

Status PythonCheckGcsHealth(const std::string &gcs_address,
const int gcs_port,
const int64_t timeout_ms,
const std::string &ray_version,
const bool skip_version_check,
bool &is_healthy) {
auto channel = rpc::GcsRpcClient::CreateGcsChannel(gcs_address, gcs_port);
auto stub = rpc::NodeInfoGcsService::NewStub(channel);
grpc::ClientContext context;
if (timeout_ms != -1) {
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::milliseconds(timeout_ms));
}
rpc::CheckAliveRequest request;
rpc::CheckAliveReply reply;
grpc::Status status = stub->CheckAlive(&context, request, &reply);
if (!status.ok()) {
is_healthy = false;
return Status::RpcError(status.error_message(), status.error_code());
}
if (reply.status().code() != static_cast<int>(StatusCode::OK)) {
is_healthy = false;
return HandleGcsError(reply.status());
}
if (!skip_version_check) {
// Check for Ray version match
if (reply.ray_version() != ray_version) {
is_healthy = false;
std::ostringstream ss;
ss << "Ray cluster at " << gcs_address << ":" << gcs_port << " has version "
<< reply.ray_version() << ", but this process "
<< "is running Ray version " << ray_version << ".";
return Status::Invalid(ss.str());
}
}
is_healthy = true;
return Status::OK();
}

/// Creates a singleton thread that runs an io_service.
/// All ConnectToGcsStandalone calls will share this io_service.
class SingletonIoContext {
Expand Down
7 changes: 0 additions & 7 deletions src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,6 @@ std::unordered_map<std::string, double> PythonGetResourcesTotal(
std::unordered_map<std::string, std::string> PythonGetNodeLabels(
const rpc::GcsNodeInfo &node_info);

Status PythonCheckGcsHealth(const std::string &gcs_address,
const int gcs_port,
const int64_t timeout_ms,
const std::string &ray_version,
const bool skip_version_check,
bool &is_healthy);

} // namespace gcs

} // namespace ray

0 comments on commit 1dd8d60

Please sign in to comment.