From 1dd8d60bcbbf74b0d22ea4447a787a33817ff20b Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 9 Sep 2024 04:55:03 -0700 Subject: [PATCH] [Core] Remove ray._raylet.check_health (#47526) Signed-off-by: Jiajun Yao --- python/ray/_private/internal_api.py | 2 - python/ray/_raylet.pyx | 42 -------------------- python/ray/includes/common.pxd | 5 --- python/ray/scripts/scripts.py | 49 ++++++++++++------------ python/ray/tests/test_advanced_4.py | 24 ------------ python/ray/tests/test_cli.py | 57 +++++++++++++++++++++------- src/ray/gcs/gcs_client/gcs_client.cc | 39 ------------------- src/ray/gcs/gcs_client/gcs_client.h | 7 ---- 8 files changed, 68 insertions(+), 157 deletions(-) diff --git a/python/ray/_private/internal_api.py b/python/ray/_private/internal_api.py index cf42ba185c7a..f4efbde4db21 100644 --- a/python/ray/_private/internal_api.py +++ b/python/ray/_private/internal_api.py @@ -5,7 +5,6 @@ import ray._private.services as services import ray._private.utils as utils import ray._private.worker -from ray._private import ray_constants from ray._private.state import GlobalState from ray._raylet import GcsClientOptions from ray.core.generated import common_pb2 @@ -34,7 +33,6 @@ def get_state_from_address(address=None): def memory_summary( address=None, - redis_password=ray_constants.REDIS_DEFAULT_PASSWORD, group_by="NODE_ADDRESS", sort_by="OBJECT_SIZE", units="B", diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f466a1b815cb..9e0953a33b0a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -134,7 +134,6 @@ from ray.includes.common cimport ( kResourceUnitScaling, kImplicitResourcePrefix, kWorkerSetupHookKeyName, - PythonCheckGcsHealth, PythonGetNodeLabels, PythonGetResourcesTotal, ) @@ -3263,47 +3262,6 @@ cdef class _TestOnly_GcsActorSubscriber(_GcsSubscriber): return [(key_id, info)] -def check_health(address: str, timeout=2, skip_version_check=False): - """Checks Ray cluster health, before / without actually connecting to the - cluster via ray.init(). - - Args: - address: Ray cluster / GCS address string, e.g. ip:port. - timeout: request timeout. - skip_version_check: If True, will skip comparision of GCS Ray version with local - Ray version. If False (default), will raise exception on mismatch. - Returns: - Returns True if the cluster is running and has matching Ray version. - Returns False if no service is running. - Raises an exception otherwise. - """ - - tokens = address.rsplit(":", 1) - if len(tokens) != 2: - raise ValueError("Invalid address: {}. Expect 'ip:port'".format(address)) - gcs_address, gcs_port = tokens - - cdef: - c_string c_gcs_address = gcs_address - int c_gcs_port = int(gcs_port) - int64_t timeout_ms = round(1000 * timeout) if timeout else -1 - c_string c_ray_version = ray.__version__ - c_bool c_skip_version_check = skip_version_check - c_bool c_is_healthy = True - - try: - with nogil: - check_status(PythonCheckGcsHealth( - c_gcs_address, c_gcs_port, timeout_ms, c_ray_version, - c_skip_version_check, c_is_healthy)) - except RpcError: - traceback.print_exc() - except RaySystemError as e: - raise RuntimeError(str(e)) - - return c_is_healthy - - cdef class CoreWorker: def __cinit__(self, worker_type, store_socket, raylet_socket, diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 9da032607862..cc3eaf7d7389 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -698,11 +698,6 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" namespace "ray::gcs" nogil: unordered_map[c_string, c_string] PythonGetNodeLabels( const CGcsNodeInfo& node_info) - CRayStatus PythonCheckGcsHealth( - const c_string& gcs_address, const int gcs_port, const int64_t timeout_ms, - const c_string& ray_version, const c_bool skip_version_check, - c_bool& is_healthy) - cdef extern from "src/ray/protobuf/gcs.pb.h" nogil: cdef enum CChannelType "ray::rpc::ChannelType": RAY_ERROR_INFO_CHANNEL "ray::rpc::ChannelType::RAY_ERROR_INFO_CHANNEL", diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 6ed5fabea435..f94d0dbc9dc2 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -7,7 +7,6 @@ import subprocess import sys import time -import traceback import urllib import urllib.parse import warnings @@ -57,6 +56,18 @@ logger = logging.getLogger(__name__) +def _check_ray_version(gcs_client): + import ray._private.usage.usage_lib as ray_usage_lib + + cluster_metadata = ray_usage_lib.get_cluster_metadata(gcs_client) + if cluster_metadata and cluster_metadata["ray_version"] != ray.__version__: + raise RuntimeError( + "Ray version mismatch: cluster has Ray version " + f'{cluster_metadata["ray_version"]} ' + f"but local Ray version is {ray.__version__}" + ) + + @click.group() @click.option( "--logging-level", @@ -1953,13 +1964,12 @@ def memory( ): """Print object references held in a Ray cluster.""" address = services.canonicalize_bootstrap_address_or_die(address) - if not ray._raylet.check_health(address): - raise click.BadParameter(f"Ray cluster is not found at {address}") + gcs_client = ray._raylet.GcsClient(address=address) + _check_ray_version(gcs_client) time = datetime.now() header = "=" * 8 + f" Object references status: {time} " + "=" * 8 mem_stats = memory_summary( address, - redis_password, group_by, sort_by, units, @@ -1993,16 +2003,11 @@ def memory( def status(address: str, redis_password: str, verbose: bool): """Print cluster status, including autoscaling info.""" address = services.canonicalize_bootstrap_address_or_die(address) - if not ray._raylet.check_health(address): - raise click.BadParameter(f"Ray cluster is not found at {address}") gcs_client = ray._raylet.GcsClient(address=address) + _check_ray_version(gcs_client) ray.experimental.internal_kv._initialize_internal_kv(gcs_client) - status = ray.experimental.internal_kv._internal_kv_get( - ray_constants.DEBUG_AUTOSCALING_STATUS - ) - error = ray.experimental.internal_kv._internal_kv_get( - ray_constants.DEBUG_AUTOSCALING_ERROR - ) + status = gcs_client.internal_kv_get(ray_constants.DEBUG_AUTOSCALING_STATUS.encode()) + error = gcs_client.internal_kv_get(ray_constants.DEBUG_AUTOSCALING_ERROR.encode()) print(debug_status(status, error, verbose=verbose, address=address)) @@ -2292,10 +2297,9 @@ def drain_node( raise click.BadParameter(f"Invalid hex ID of a Ray node, got {node_id}") address = services.canonicalize_bootstrap_address_or_die(address) - if not ray._raylet.check_health(address): - raise click.BadParameter(f"Ray cluster is not found at {address}") gcs_client = ray._raylet.GcsClient(address=address) + _check_ray_version(gcs_client) is_accepted, rejection_error_message = gcs_client.drain_node( node_id, autoscaler_pb2.DrainNodeReason.Value(reason), @@ -2370,20 +2374,15 @@ def healthcheck(address, redis_password, component, skip_version_check): """ address = services.canonicalize_bootstrap_address_or_die(address) + gcs_client = ray._raylet.GcsClient(address=address) + if not skip_version_check: + _check_ray_version(gcs_client) if not component: - try: - if ray._raylet.check_health(address, skip_version_check=skip_version_check): - sys.exit(0) - except Exception: - traceback.print_exc() - pass - sys.exit(1) + sys.exit(0) - gcs_client = ray._raylet.GcsClient(address=address) - ray.experimental.internal_kv._initialize_internal_kv(gcs_client) - report_str = ray.experimental.internal_kv._internal_kv_get( - component, namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK + report_str = gcs_client.internal_kv_get( + component.encode(), namespace=ray_constants.KV_NAMESPACE_HEALTHCHECK ) if not report_str: # Status was never updated diff --git a/python/ray/tests/test_advanced_4.py b/python/ray/tests/test_advanced_4.py index 573380013c8f..1a156a9c64f8 100644 --- a/python/ray/tests/test_advanced_4.py +++ b/python/ray/tests/test_advanced_4.py @@ -1,11 +1,9 @@ -from unittest import mock import subprocess import sys import pytest import ray -from ray._raylet import check_health from ray._private.test_utils import ( Semaphore, client_test_enabled, @@ -114,28 +112,6 @@ def test_jemalloc_env_var_propagate(): assert actual == expected -def test_check_health(shutdown_only): - assert not check_health("127.0.0.1:8888") - # Should not raise error: https://github.com/ray-project/ray/issues/38785 - assert not check_health("ip:address:with:colon:name:8265") - - with pytest.raises(ValueError): - check_health("bad_address_no_port") - - conn = ray.init() - addr = conn.address_info["address"] - assert check_health(addr) - - -def test_check_health_version_check(shutdown_only): - with mock.patch("ray.__version__", "FOO-VERSION"): - conn = ray.init() - addr = conn.address_info["address"] - assert check_health(addr, skip_version_check=True) - with pytest.raises(RuntimeError): - check_health(addr) - - def test_back_pressure(shutdown_only_with_initialization_check): ray.init() diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 01e8cdc1a764..eec7b71c0c4c 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -1055,7 +1055,10 @@ def test_ray_check_open_ports(shutdown_only, start_open_port_check_server): assert "[🛑] open ports detected" in result.output -def test_ray_drain_node(): +def test_ray_drain_node(monkeypatch): + monkeypatch.setenv("RAY_py_gcs_connect_timeout_s", "1") + ray._raylet.Config.initialize("") + runner = CliRunner() result = runner.invoke( scripts.drain_node, @@ -1105,7 +1108,9 @@ def test_ray_drain_node(): ], ) assert result.exit_code != 0 - assert "Ray cluster is not found at 127.0.0.2:8888" in result.output + assert "Timed out while waiting for GCS to become available" in str( + result.exception + ) result = runner.invoke( scripts.drain_node, @@ -1121,10 +1126,32 @@ def test_ray_drain_node(): assert result.exit_code != 0 assert "Invalid hex ID of a Ray node, got invalid-node-id" in result.output - with patch("ray._raylet.check_health", return_value=True), patch( - "ray._raylet.GcsClient" - ) as MockGcsClient: + with patch("ray._raylet.GcsClient") as MockGcsClient: + mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + '{"ray_version": "ray_version_mismatch"}'.encode() + ) + result = runner.invoke( + scripts.drain_node, + [ + "--address", + "127.0.0.1:6543", + "--node-id", + "0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf", + "--reason", + "DRAIN_NODE_REASON_IDLE_TERMINATION", + "--reason-message", + "idle termination", + ], + ) + assert result.exit_code != 0 + assert "Ray version mismatch" in str(result.exception) + + with patch("ray._raylet.GcsClient") as MockGcsClient: mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + f'{{"ray_version": "{ray.__version__}"}}'.encode() + ) mock_gcs_client.drain_node.return_value = (True, "") result = runner.invoke( scripts.drain_node, @@ -1140,17 +1167,18 @@ def test_ray_drain_node(): ], ) assert result.exit_code == 0 - assert mock_gcs_client.mock_calls[0] == mock.call.drain_node( + assert mock_gcs_client.mock_calls[1] == mock.call.drain_node( "0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf", 1, "idle termination", 0, ) - with patch("ray._raylet.check_health", return_value=True), patch( - "ray._raylet.GcsClient" - ) as MockGcsClient: + with patch("ray._raylet.GcsClient") as MockGcsClient: mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + f'{{"ray_version": "{ray.__version__}"}}'.encode() + ) mock_gcs_client.drain_node.return_value = (False, "Node not idle") result = runner.invoke( scripts.drain_node, @@ -1168,10 +1196,13 @@ def test_ray_drain_node(): assert result.exit_code != 0 assert "The drain request is not accepted: Node not idle" in result.output - with patch("ray._raylet.check_health", return_value=True), patch( - "time.time_ns", return_value=1000000000 - ), patch("ray._raylet.GcsClient") as MockGcsClient: + with patch("time.time_ns", return_value=1000000000), patch( + "ray._raylet.GcsClient" + ) as MockGcsClient: mock_gcs_client = MockGcsClient.return_value + mock_gcs_client.internal_kv_get.return_value = ( + f'{{"ray_version": "{ray.__version__}"}}'.encode() + ) mock_gcs_client.drain_node.return_value = (True, "") result = runner.invoke( scripts.drain_node, @@ -1189,7 +1220,7 @@ def test_ray_drain_node(): ], ) assert result.exit_code == 0 - assert mock_gcs_client.mock_calls[0] == mock.call.drain_node( + assert mock_gcs_client.mock_calls[1] == mock.call.drain_node( "0db0438b5cfd6e84d7ec07212ed76b23be7886cbd426ef4d1879bebf", 2, "spot preemption", diff --git a/src/ray/gcs/gcs_client/gcs_client.cc b/src/ray/gcs/gcs_client/gcs_client.cc index 27717c6d2f62..c91b5a073be3 100644 --- a/src/ray/gcs/gcs_client/gcs_client.cc +++ b/src/ray/gcs/gcs_client/gcs_client.cc @@ -708,45 +708,6 @@ std::unordered_map PythonGetNodeLabels( node_info.labels().end()); } -Status PythonCheckGcsHealth(const std::string &gcs_address, - const int gcs_port, - const int64_t timeout_ms, - const std::string &ray_version, - const bool skip_version_check, - bool &is_healthy) { - auto channel = rpc::GcsRpcClient::CreateGcsChannel(gcs_address, gcs_port); - auto stub = rpc::NodeInfoGcsService::NewStub(channel); - grpc::ClientContext context; - if (timeout_ms != -1) { - context.set_deadline(std::chrono::system_clock::now() + - std::chrono::milliseconds(timeout_ms)); - } - rpc::CheckAliveRequest request; - rpc::CheckAliveReply reply; - grpc::Status status = stub->CheckAlive(&context, request, &reply); - if (!status.ok()) { - is_healthy = false; - return Status::RpcError(status.error_message(), status.error_code()); - } - if (reply.status().code() != static_cast(StatusCode::OK)) { - is_healthy = false; - return HandleGcsError(reply.status()); - } - if (!skip_version_check) { - // Check for Ray version match - if (reply.ray_version() != ray_version) { - is_healthy = false; - std::ostringstream ss; - ss << "Ray cluster at " << gcs_address << ":" << gcs_port << " has version " - << reply.ray_version() << ", but this process " - << "is running Ray version " << ray_version << "."; - return Status::Invalid(ss.str()); - } - } - is_healthy = true; - return Status::OK(); -} - /// Creates a singleton thread that runs an io_service. /// All ConnectToGcsStandalone calls will share this io_service. class SingletonIoContext { diff --git a/src/ray/gcs/gcs_client/gcs_client.h b/src/ray/gcs/gcs_client/gcs_client.h index 9d66ffcc92c4..fae6d7b8aca5 100644 --- a/src/ray/gcs/gcs_client/gcs_client.h +++ b/src/ray/gcs/gcs_client/gcs_client.h @@ -350,13 +350,6 @@ std::unordered_map PythonGetResourcesTotal( std::unordered_map PythonGetNodeLabels( const rpc::GcsNodeInfo &node_info); -Status PythonCheckGcsHealth(const std::string &gcs_address, - const int gcs_port, - const int64_t timeout_ms, - const std::string &ray_version, - const bool skip_version_check, - bool &is_healthy); - } // namespace gcs } // namespace ray