From fa51a7684a1ff66aeec339cf0cf6a55628fa576f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Fri, 11 Mar 2022 03:35:09 +0800 Subject: [PATCH 01/10] catch port conflict in agent --- dashboard/agent.py | 36 ++++++-- dashboard/modules/reporter/reporter_agent.py | 3 +- .../modules/runtime_env/runtime_env_agent.py | 5 +- python/ray/_private/metrics_agent.py | 17 +++- python/ray/tests/conftest.py | 13 +++ python/ray/tests/test_dashboard.py | 86 +++++++++++++++++-- src/ray/common/ray_config_def.h | 4 +- src/ray/raylet/agent_manager.cc | 27 ++++-- src/ray/raylet/agent_manager.h | 3 + src/ray/raylet/node_manager.cc | 2 +- 10 files changed, 166 insertions(+), 30 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index 05fbc332e09a..2c2038612ae1 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -84,17 +84,30 @@ def __init__( self.ppid = int(os.environ["RAY_RAYLET_PID"]) assert self.ppid > 0 logger.info("Parent pid is %s", self.ppid) - self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),)) - grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0" - self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server( - self.server, f"{grpc_ip}:{self.dashboard_agent_port}" - ) - logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port) + + # Setup raylet channel options = (("grpc.enable_http_proxy", 0),) self.aiogrpc_raylet_channel = ray._private.utils.init_grpc_channel( f"{self.ip}:{self.node_manager_port}", options, asynchronous=True ) + # Setup grpc server + self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),)) + grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0" + try: + self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server( + self.server, f"{grpc_ip}:{self.dashboard_agent_port}" + ) + except Exception: + logger.exception( + "Failed to add port to grpc server. Agent will stay alive but " + "disable the grpc service." + ) + self.server = None + self.grpc_port = None + else: + logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port) + # If the agent is started as non-minimal version, http server should # be configured to communicate with the dashboard in a head node. self.http_server = None @@ -147,7 +160,8 @@ async def _check_parent(): check_parent_task = create_task(_check_parent()) # Start a grpc asyncio server. - await self.server.start() + if self.server: + await self.server.start() self.gcs_client = GcsClient(address=self.gcs_address) modules = self._load_modules() @@ -159,7 +173,13 @@ async def _check_parent(): # Http server is not started in the minimal version because # it requires additional dependencies that are not # included in the minimal ray package. - self.http_server = await self._configure_http_server(modules) + try: + self.http_server = await self._configure_http_server(modules) + except Exception: + logger.exception( + "Failed to start http server. Agent will stay alive but " + "disable the http service." + ) # Write the dashboard agent port to kv. # TODO: Use async version if performance is an issue diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index e4b1fcfae1c0..6118ecf4ef18 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -603,7 +603,8 @@ async def _perform_iteration(self, publisher): await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) async def run(self, server): - reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server) + if server: + reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server) gcs_addr = self._dashboard_agent.gcs_address assert gcs_addr is not None diff --git a/dashboard/modules/runtime_env/runtime_env_agent.py b/dashboard/modules/runtime_env/runtime_env_agent.py index 09b5b388f092..87eaa6b5b463 100644 --- a/dashboard/modules/runtime_env/runtime_env_agent.py +++ b/dashboard/modules/runtime_env/runtime_env_agent.py @@ -324,7 +324,10 @@ async def DeleteURIs(self, request, context): ) async def run(self, server): - runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server(self, server) + if server: + runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server( + self, server + ) @staticmethod def is_minimal_module(): diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 1ef807c4d9b1..d3a3bb7c713e 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -74,17 +74,26 @@ def __init__(self, metrics_export_address, metrics_export_port): self._lock = threading.Lock() # Configure exporter. (We currently only support prometheus). - self.view_manager.register_exporter( - prometheus_exporter.new_stats_exporter( + try: + stats_exporter = prometheus_exporter.new_stats_exporter( prometheus_exporter.Options( namespace="ray", port=metrics_export_port, address=metrics_export_address, ) ) - ) + except Exception: + logger.exception( + "Failed to start prometheus stats exporter. Agent will stay " + "alive but disable the stats." + ) + self.view_manager = None + else: + self.view_manager.register_exporter(stats_exporter) def record_reporter_stats(self, records: List[Record]): + if not self.view_manager: + return with self._lock: for record in records: gauge = record.gauge @@ -111,6 +120,8 @@ def _record_gauge(self, gauge: Gauge, value: float, tags: dict): def record_metric_points_from_protobuf(self, metrics: List[Metric]): """Record metrics from Opencensus Protobuf""" + if not self.view_manager: + return with self._lock: self._record_metrics(metrics) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index bb5a1f7c3475..9fb8a582e1a1 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -628,3 +628,16 @@ def set_runtime_env_retry_times(request): yield runtime_env_retry_times finally: del os.environ["RUNTIME_ENV_RETRY_TIMES"] + + +@pytest.fixture +def listen_port(request): + port = getattr(request, "param", 0) + try: + sock = socket.socket() + if hasattr(socket, "SO_REUSEPORT"): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 0) + sock.bind(("127.0.0.1", port)) + yield port + finally: + sock.close() diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index 219ad319de66..ebf7aee4c0a6 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -1,6 +1,5 @@ import os import re -import socket import subprocess import sys import time @@ -95,11 +94,12 @@ def dashboard_available(): ) -def test_port_conflict(call_ray_stop_only, shutdown_only): - sock = socket.socket() - if hasattr(socket, "SO_REUSEPORT"): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 0) - sock.bind(("127.0.0.1", 9999)) +@pytest.mark.parametrize( + "listen_port", + [9999], + indirect=True, +) +def test_port_conflict(listen_port, call_ray_stop_only, shutdown_only): try: subprocess.check_output( @@ -121,8 +121,6 @@ def test_port_conflict(call_ray_stop_only, shutdown_only): with pytest.raises(ValueError, match="already occupied"): ray.init(dashboard_port=9999, include_dashboard=True) - sock.close() - def test_dashboard(shutdown_only): addresses = ray.init(include_dashboard=True, num_cpus=1) @@ -203,6 +201,78 @@ def matcher(log_batch): ) +conflict_port = 34567 + + +def run_tasks_without_runtime_env(): + @ray.remote + def f(): + pass + + for _ in range(10): + time.sleep(1) + ray.get(f.remote()) + + +def run_tasks_with_runtime_env(): + @ray.remote(runtime_env={"pip": ["pip-install-test==0.5"]}) + def f(): + import pip_install_test # noqa + + pass + + for _ in range(3): + time.sleep(1) + ray.get(f.remote()) + + +@pytest.mark.skipif( + sys.platform == "win32", reason="`runtime_env` with `pip` not supported on Windows." +) +@pytest.mark.parametrize( + "listen_port", + [conflict_port], + indirect=True, +) +@pytest.mark.parametrize( + "call_ray_start", + [f"ray start --head --num-cpus=1 --dashboard-agent-grpc-port={conflict_port}"], + indirect=True, +) +def test_dashboard_agent_grpc_port_conflict(listen_port, call_ray_start): + address = call_ray_start + ray.init(address=address) + # Tasks without runtime env still work when dashboard agent grpc port conflicts. + run_tasks_without_runtime_env() + # Tasks with runtime env couldn't work. + with pytest.raises(ray.exceptions.RuntimeEnvSetupError): + run_tasks_with_runtime_env() + + +@pytest.mark.skipif( + sys.platform == "win32", reason="`runtime_env` with `pip` not supported on Windows." +) +@pytest.mark.parametrize( + "listen_port", + [conflict_port], + indirect=True, +) +@pytest.mark.parametrize( + "call_ray_start", + [ + f"ray start --head --num-cpus=1 --metrics-export-port={conflict_port}", + f"ray start --head --num-cpus=1 --dashboard-agent-listen-port={conflict_port}", + ], + indirect=True, +) +def test_dashboard_agent_metrics_or_http_port_conflict(listen_port, call_ray_start): + address = call_ray_start + ray.init(address=address) + # Tasks with runtime env still work when other agent port conflicts, + # except grpc port. + run_tasks_with_runtime_env() + + if __name__ == "__main__": import pytest diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 21690198d7c6..48638267d599 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -347,10 +347,10 @@ RAY_CONFIG(uint32_t, agent_restart_interval_ms, 1000) RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) /// Max restart count for the dashboard agent. -RAY_CONFIG(uint32_t, agent_max_restart_count, 5) +RAY_CONFIG(uint32_t, agent_max_restart_count, 0) /// Whether to fail raylet when agent fails. -RAY_CONFIG(bool, raylet_shares_fate_with_agent, false) +RAY_CONFIG(bool, raylet_shares_fate_with_agent, true) /// If the agent manager fails to communicate with the dashboard agent, we will retry /// after this interval. diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 5c023d0c0cfb..3012b2b0b672 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -32,10 +32,18 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, agent_ip_address_ = request.agent_ip_address(); agent_port_ = request.agent_port(); agent_pid_ = request.agent_pid(); - runtime_env_agent_client_ = - runtime_env_agent_client_factory_(agent_ip_address_, agent_port_); - RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ - << ", port: " << agent_port_ << ", pid: " << agent_pid_; + // Note: `agent_port_` should be 0 if the grpc port of agent is in conflict. + if (agent_port_ != 0) { + runtime_env_agent_client_ = + runtime_env_agent_client_factory_(agent_ip_address_, agent_port_); + RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ + << ", port: " << agent_port_ << ", pid: " << agent_pid_; + } else { + RAY_LOG(ERROR) << "The grpc port of agent is invalid (be 0), ip: " + << agent_ip_address_ << ", pid: " << agent_pid_ + << ". Disable the agent client in raylet."; + disable_agent_client_ = true; + } reply->set_status(rpc::AGENT_RPC_STATUS_OK); // Reset the restart count after registration is done. agent_restart_count_ = 0; @@ -102,6 +110,7 @@ void AgentManager::StartAgent() { RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() << " exit, return value " << exit_code << ". ip " << agent_ip_address_ << ". pid " << agent_pid_; + agent_failed_ = true; if (agent_restart_count_ < RayConfig::instance().agent_max_restart_count()) { RAY_UNUSED(delay_executor_( [this] { @@ -162,10 +171,11 @@ void AgentManager::CreateRuntimeEnv( if (runtime_env_agent_client_ == nullptr) { // If the agent cannot be restarted anymore, fail the request. - if (agent_restart_count_ >= RayConfig::instance().agent_max_restart_count()) { + if (disable_agent_client_ || agent_failed_) { std::stringstream str_stream; str_stream << "Runtime environment " << serialized_runtime_env - << " cannot be created on this node because the agent is dead."; + << " cannot be created on this node because the grpc service of agent " + "is invalid."; const auto &error_message = str_stream.str(); RAY_LOG(WARNING) << error_message; delay_executor_( @@ -230,6 +240,11 @@ void AgentManager::CreateRuntimeEnv( void AgentManager::DeleteURIs(const std::vector &uris, DeleteURIsCallback callback) { + if (disable_agent_client_) { + RAY_LOG(ERROR) << "Failed to delete URIs because the agent client is disabled."; + delay_executor_([callback] { callback(false); }, 0); + return; + } if (runtime_env_agent_client_ == nullptr) { RAY_LOG(INFO) << "Runtime env agent is not registered yet. Will retry DeleteURIs later."; diff --git a/src/ray/raylet/agent_manager.h b/src/ray/raylet/agent_manager.h index 1340818c6f5f..b5e6a3f17d7e 100644 --- a/src/ray/raylet/agent_manager.h +++ b/src/ray/raylet/agent_manager.h @@ -89,6 +89,8 @@ class AgentManager : public rpc::AgentManagerServiceHandler { int agent_port_ = 0; /// The number of times the agent is restarted. std::atomic agent_restart_count_ = 0; + /// The flag indicates whether agent has failed. + std::atomic agent_failed_ = false; /// Whether or not we intend to start the agent. This is false if we /// are missing Ray Dashboard dependencies, for example. bool should_start_agent_ = true; @@ -96,6 +98,7 @@ class AgentManager : public rpc::AgentManagerServiceHandler { DelayExecutorFn delay_executor_; RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_; std::shared_ptr runtime_env_agent_client_; + bool disable_agent_client_ = false; }; class DefaultAgentManagerServiceHandler : public rpc::AgentManagerServiceHandler { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 989cd3821468..8fd97a612c47 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -405,7 +405,7 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self }, /*runtime_env_agent_factory=*/ [this](const std::string &ip_address, int port) { - RAY_CHECK(!ip_address.empty() && port != 0) + RAY_CHECK(!ip_address.empty()) << "ip_address: " << ip_address << " port: " << port; return std::shared_ptr( new rpc::RuntimeEnvAgentClient(ip_address, port, client_call_manager_)); From 15c286be0d9be66e95c4655f6ef21745c8194044 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Fri, 11 Mar 2022 04:10:45 +0800 Subject: [PATCH 02/10] remove the agent restart feature --- dashboard/agent.py | 55 +++++++++++++--------------- dashboard/tests/test_dashboard.py | 34 ----------------- python/ray/tests/test_dashboard.py | 23 +++--------- python/ray/tests/test_runtime_env.py | 16 +------- src/ray/common/ray_config_def.h | 6 --- src/ray/raylet/agent_manager.cc | 41 +++------------------ src/ray/raylet/agent_manager.h | 2 - 7 files changed, 40 insertions(+), 137 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index 2c2038612ae1..f2fab082286f 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -381,36 +381,33 @@ async def _check_parent(): except Exception as e: # All these env vars should be available because # they are provided by the parent raylet. - restart_count = os.environ["RESTART_COUNT"] - max_restart_count = os.environ["MAX_RESTART_COUNT"] raylet_pid = os.environ["RAY_RAYLET_PID"] node_ip = args.node_ip_address - if restart_count >= max_restart_count: - # Agent is failed to be started many times. - # Push an error to all drivers, so that users can know the - # impact of the issue. - gcs_publisher = GcsPublisher(args.gcs_address) - traceback_str = ray._private.utils.format_error_message( - traceback.format_exc() - ) - message = ( - f"(ip={node_ip}) " - f"The agent on node {platform.uname()[1]} failed to " - f"be restarted {max_restart_count} " - "times. There are 3 possible problems if you see this error." - "\n 1. The dashboard might not display correct " - "information on this node." - "\n 2. Metrics on this node won't be reported." - "\n 3. runtime_env APIs won't work." - "\nCheck out the `dashboard_agent.log` to see the " - "detailed failure messages." - ) - ray._private.utils.publish_error_to_driver( - ray_constants.DASHBOARD_AGENT_DIED_ERROR, - message, - redis_client=None, - gcs_publisher=gcs_publisher, - ) - logger.error(message) + # Agent is failed to be started many times. + # Push an error to all drivers, so that users can know the + # impact of the issue. + redis_client = None + gcs_publisher = GcsPublisher(address=args.gcs_address) + + traceback_str = ray._private.utils.format_error_message(traceback.format_exc()) + message = ( + f"(ip={node_ip}) " + f"The agent on node {platform.uname()[1]} failed to " + f"be started." + "There are 3 possible problems if you see this error." + "\n 1. The dashboard might not display correct " + "information on this node." + "\n 2. Metrics on this node won't be reported." + "\n 3. runtime_env APIs won't work." + "\nCheck out the `dashboard_agent.log` to see the " + "detailed failure messages." + ) + ray._private.utils.publish_error_to_driver( + ray_constants.DASHBOARD_AGENT_DIED_ERROR, + message, + redis_client=None, + gcs_publisher=gcs_publisher, + ) + logger.error(message) logger.exception(e) exit(1) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 77b961cb0a21..3d20a64d9ca3 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -113,34 +113,6 @@ def test_basic(ray_start_with_dashboard): raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0] raylet_proc = psutil.Process(raylet_proc_info.process.pid) - # Test for bad imports, the agent should be restarted. - logger.info("Test for bad imports.") - agent_proc = search_agent(raylet_proc.children()) - prepare_test_files() - agent_pids = set() - try: - assert agent_proc is not None - agent_proc.kill() - agent_proc.wait() - # The agent will be restarted for imports failure. - for _ in range(300): - agent_proc = search_agent(raylet_proc.children()) - if agent_proc: - agent_pids.add(agent_proc.pid) - # The agent should be restarted, - # so we can break if the len(agent_pid) > 1 - if len(agent_pids) > 1: - break - time.sleep(0.1) - finally: - cleanup_test_files() - assert len(agent_pids) > 1, agent_pids - - agent_proc = search_agent(raylet_proc.children()) - if agent_proc: - agent_proc.kill() - agent_proc.wait() - logger.info("Test agent register is OK.") wait_for_condition(lambda: search_agent(raylet_proc.children())) assert dashboard_proc.status() in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING] @@ -149,11 +121,6 @@ def test_basic(ray_start_with_dashboard): check_agent_register(raylet_proc, agent_pid) - # The agent should be dead if raylet exits. - raylet_proc.kill() - raylet_proc.wait() - agent_proc.wait(5) - # Check kv keys are set. logger.info("Check kv keys are set.") dashboard_address = ray.experimental.internal_kv._internal_kv_get( @@ -177,7 +144,6 @@ def test_raylet_and_agent_share_fate(shutdown_only): system_config = { "raylet_shares_fate_with_agent": True, - "agent_max_restart_count": 0, } ray.init(include_dashboard=True, _system_config=system_config) diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index ebf7aee4c0a6..65b0fdb5f755 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -159,24 +159,11 @@ def set_agent_failure_env_var(): del os.environ["_RAY_AGENT_FAILING"] -@pytest.mark.parametrize( - "ray_start_cluster_head", - [ - { - "_system_config": { - "agent_restart_interval_ms": 10, - "agent_max_restart_count": 5, - } - } - ], - indirect=True, -) def test_dashboard_agent_restart( set_agent_failure_env_var, ray_start_cluster_head, error_pubsub, log_pubsub ): - """Test that when the agent fails to start many times in a row - if the error message is suppressed correctly without spamming - the driver. + """Test that when the agent fails to start if the error message + is suppressed correctly without spamming the driver. """ # Choose a duplicated port for the agent so that it will crash. errors = get_error_message( @@ -193,10 +180,12 @@ def test_dashboard_agent_restart( # Make sure there's no spammy message for 5 seconds. def matcher(log_batch): - return log_batch["pid"] != "autoscaler" + return log_batch["pid"] == "raylet" and any( + "Agent process with pid" in line for line in log_batch["lines"] + ) match = get_log_batch(log_pubsub, 1, timeout=5, matcher=matcher) - assert len(match) == 0, ( + assert len(match) == 1, ( "There are spammy logs during Ray agent restart process. " f"Logs: {match}" ) diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 45f150d60ea3..4eb06d11c78f 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -301,18 +301,6 @@ def set_agent_failure_env_var(): del os.environ["_RAY_AGENT_FAILING"] -@pytest.mark.parametrize( - "ray_start_cluster_head", - [ - { - "_system_config": { - "agent_restart_interval_ms": 10, - "agent_max_restart_count": 5, - } - } - ], - indirect=True, -) @pytest.mark.parametrize("runtime_env_class", [dict, RuntimeEnv]) def test_runtime_env_broken( set_agent_failure_env_var, runtime_env_class, ray_start_cluster_head @@ -330,13 +318,13 @@ def f(): """ Test task raises an exception. """ - with pytest.raises(RuntimeEnvSetupError): + with pytest.raises(ray.exceptions.LocalRayletDiedError): ray.get(f.options(runtime_env=runtime_env).remote()) """ Test actor task raises an exception. """ a = A.options(runtime_env=runtime_env).remote() - with pytest.raises(ray.exceptions.RuntimeEnvSetupError): + with pytest.raises(ray.exceptions.RayActorError): ray.get(a.ready.remote()) diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 48638267d599..0103a5074ca8 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -340,15 +340,9 @@ RAY_CONFIG(int64_t, task_rpc_inlined_bytes_limit, 10 * 1024 * 1024) /// Maximum number of pending lease requests per scheduling category RAY_CONFIG(uint64_t, max_pending_lease_requests_per_scheduling_category, 10) -/// Interval to restart dashboard agent after the process exit. -RAY_CONFIG(uint32_t, agent_restart_interval_ms, 1000) - /// Wait timeout for dashboard agent register. RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) -/// Max restart count for the dashboard agent. -RAY_CONFIG(uint32_t, agent_max_restart_count, 0) - /// Whether to fail raylet when agent fails. RAY_CONFIG(bool, raylet_shares_fate_with_agent, true) diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 3012b2b0b672..eea944f02d52 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -45,8 +45,6 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, disable_agent_client_ = true; } reply->set_status(rpc::AGENT_RPC_STATUS_OK); - // Reset the restart count after registration is done. - agent_restart_count_ = 0; send_reply_callback(ray::Status::OK(), nullptr, nullptr); } @@ -77,11 +75,6 @@ void AgentManager::StartAgent() { ProcessEnvironment env; env.insert({"RAY_NODE_ID", options_.node_id.Hex()}); env.insert({"RAY_RAYLET_PID", std::to_string(getpid())}); - // Report the restart count to the agent so that we can decide whether or not - // report the error message to drivers. - env.insert({"RESTART_COUNT", std::to_string(agent_restart_count_)}); - env.insert({"MAX_RESTART_COUNT", - std::to_string(RayConfig::instance().agent_max_restart_count())}); Process child(argv.data(), nullptr, ec, false, env); if (!child.IsValid() || ec) { // The worker failed to start. This is a fatal error. @@ -111,34 +104,12 @@ void AgentManager::StartAgent() { << " exit, return value " << exit_code << ". ip " << agent_ip_address_ << ". pid " << agent_pid_; agent_failed_ = true; - if (agent_restart_count_ < RayConfig::instance().agent_max_restart_count()) { - RAY_UNUSED(delay_executor_( - [this] { - agent_restart_count_++; - StartAgent(); - }, - // Retrying with exponential backoff - RayConfig::instance().agent_restart_interval_ms() * - std::pow(2, (agent_restart_count_ + 1)))); - } else { - RAY_LOG(WARNING) << "Agent has failed to restart for " - << RayConfig::instance().agent_max_restart_count() - << " times in a row without registering the agent. This is highly " - "likely there's a bug in the dashboard agent. Please check out " - "the dashboard_agent.log file."; - RAY_EVENT(WARNING, EL_RAY_AGENT_EXIT) - .WithField("ip", agent_ip_address_) - .WithField("pid", agent_pid_) - << "Agent failed to be restarted " - << RayConfig::instance().agent_max_restart_count() - << " times. Agent won't be restarted."; - if (RayConfig::instance().raylet_shares_fate_with_agent()) { - RAY_LOG(ERROR) << "Raylet exits immediately because the ray agent has failed. " - "Raylet fate shares with the agent. It can happen because the " - "Ray agent is unexpectedly killed or failed. See " - "`dashboard_agent.log` for the root cause."; - QuickExit(); - } + if (RayConfig::instance().raylet_shares_fate_with_agent()) { + RAY_LOG(ERROR) << "Raylet exits immediately because the ray agent has failed. " + "Raylet fate shares with the agent. It can happen because the " + "Ray agent is unexpectedly killed or failed. See " + "`dashboard_agent.log` for the root cause."; + QuickExit(); } }); monitor_thread.detach(); diff --git a/src/ray/raylet/agent_manager.h b/src/ray/raylet/agent_manager.h index b5e6a3f17d7e..ac7542c13e53 100644 --- a/src/ray/raylet/agent_manager.h +++ b/src/ray/raylet/agent_manager.h @@ -87,8 +87,6 @@ class AgentManager : public rpc::AgentManagerServiceHandler { Options options_; pid_t agent_pid_ = 0; int agent_port_ = 0; - /// The number of times the agent is restarted. - std::atomic agent_restart_count_ = 0; /// The flag indicates whether agent has failed. std::atomic agent_failed_ = false; /// Whether or not we intend to start the agent. This is false if we From d2f62293809d6c98a0045c39063cbfbe64dcf1fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Fri, 11 Mar 2022 04:33:24 +0800 Subject: [PATCH 03/10] fix --- dashboard/agent.py | 8 +------- python/ray/tests/test_dashboard.py | 6 ++---- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index f2fab082286f..8985fb284654 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -393,13 +393,7 @@ async def _check_parent(): message = ( f"(ip={node_ip}) " f"The agent on node {platform.uname()[1]} failed to " - f"be started." - "There are 3 possible problems if you see this error." - "\n 1. The dashboard might not display correct " - "information on this node." - "\n 2. Metrics on this node won't be reported." - "\n 3. runtime_env APIs won't work." - "\nCheck out the `dashboard_agent.log` to see the " + "be started. Check out the `dashboard_agent.log` to see the" "detailed failure messages." ) ray._private.utils.publish_error_to_driver( diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index 65b0fdb5f755..978232cf5bed 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -171,9 +171,7 @@ def test_dashboard_agent_restart( ) assert len(errors) == 1 for e in errors: - assert ( - "There are 3 possible problems " "if you see this error." in e.error_message - ) + assert "Check out the `dashboard_agent.log`" in e.error_message # Make sure the agent process is not started anymore. cluster = ray_start_cluster_head wait_for_condition(lambda: search_agents(cluster) is None) @@ -185,7 +183,7 @@ def matcher(log_batch): ) match = get_log_batch(log_pubsub, 1, timeout=5, matcher=matcher) - assert len(match) == 1, ( + assert len(match) == 0, ( "There are spammy logs during Ray agent restart process. " f"Logs: {match}" ) From 1c7ecad9f3595e44276faa17d70ef482214c9e0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Fri, 11 Mar 2022 15:08:19 +0800 Subject: [PATCH 04/10] fix cpp test --- src/ray/raylet/worker_pool_test.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 14a06dcdca3f..f770b6e801db 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -473,7 +473,9 @@ class WorkerPoolTest : public ::testing::Test { new MockRuntimeEnvAgentClient()); }, false); - const rpc::RegisterAgentRequest request; + rpc::RegisterAgentRequest request; + // Set agent port to a nonzero value to avoid invalid agent client. + request.set_agent_port(12345); rpc::RegisterAgentReply reply; auto send_reply_callback = [](ray::Status status, std::function f1, std::function f2) {}; From dc9126d57e49de49427349cc410d6c647c62216c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Sat, 12 Mar 2022 02:47:09 +0800 Subject: [PATCH 05/10] address comments --- dashboard/agent.py | 37 ++++++---------------------- dashboard/tests/test_dashboard.py | 7 ++---- python/ray/_private/metrics_agent.py | 3 +++ src/ray/common/ray_config_def.h | 3 --- src/ray/raylet/agent_manager.cc | 23 ++++++++--------- src/ray/raylet/agent_manager.h | 4 +-- 6 files changed, 25 insertions(+), 52 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index 8985fb284654..331bff7cea6c 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -3,10 +3,8 @@ import logging import logging.handlers import os -import platform import sys import json -import traceback try: from grpc import aio as aiogrpc @@ -20,7 +18,6 @@ import ray.ray_constants as ray_constants import ray._private.services import ray._private.utils -from ray._private.gcs_pubsub import GcsPublisher from ray._private.gcs_utils import GcsClient from ray.core.generated import agent_manager_pb2 from ray.core.generated import agent_manager_pb2_grpc @@ -99,6 +96,9 @@ def __init__( self.server, f"{grpc_ip}:{self.dashboard_agent_port}" ) except Exception: + # TODO(SongGuyang): Catch the exception here because there is + # port conflict issue which brought from static port. We should + # remove this after we find better port resolution. logger.exception( "Failed to add port to grpc server. Agent will stay alive but " "disable the grpc service." @@ -176,6 +176,9 @@ async def _check_parent(): try: self.http_server = await self._configure_http_server(modules) except Exception: + # TODO(SongGuyang): Catch the exception here because there is + # port conflict issue which brought from static port. We should + # remove this after we find better port resolution. logger.exception( "Failed to start http server. Agent will stay alive but " "disable the http service." @@ -378,30 +381,6 @@ async def _check_parent(): loop = asyncio.get_event_loop() loop.run_until_complete(agent.run()) - except Exception as e: - # All these env vars should be available because - # they are provided by the parent raylet. - raylet_pid = os.environ["RAY_RAYLET_PID"] - node_ip = args.node_ip_address - # Agent is failed to be started many times. - # Push an error to all drivers, so that users can know the - # impact of the issue. - redis_client = None - gcs_publisher = GcsPublisher(address=args.gcs_address) - - traceback_str = ray._private.utils.format_error_message(traceback.format_exc()) - message = ( - f"(ip={node_ip}) " - f"The agent on node {platform.uname()[1]} failed to " - "be started. Check out the `dashboard_agent.log` to see the" - "detailed failure messages." - ) - ray._private.utils.publish_error_to_driver( - ray_constants.DASHBOARD_AGENT_DIED_ERROR, - message, - redis_client=None, - gcs_publisher=gcs_publisher, - ) - logger.error(message) - logger.exception(e) + except Exception: + logger.exception("Agent is working abnormally. It will exit immediately.") exit(1) diff --git a/dashboard/tests/test_dashboard.py b/dashboard/tests/test_dashboard.py index 3d20a64d9ca3..5e3fe2a53465 100644 --- a/dashboard/tests/test_dashboard.py +++ b/dashboard/tests/test_dashboard.py @@ -142,10 +142,7 @@ def test_basic(ray_start_with_dashboard): def test_raylet_and_agent_share_fate(shutdown_only): """Test raylet and agent share fate.""" - system_config = { - "raylet_shares_fate_with_agent": True, - } - ray.init(include_dashboard=True, _system_config=system_config) + ray.init(include_dashboard=True) all_processes = ray.worker._global_node.all_processes raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0] @@ -164,7 +161,7 @@ def test_raylet_and_agent_share_fate(shutdown_only): ray.shutdown() - ray.init(include_dashboard=True, _system_config=system_config) + ray.init(include_dashboard=True) all_processes = ray.worker._global_node.all_processes raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0] raylet_proc = psutil.Process(raylet_proc_info.process.pid) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index d3a3bb7c713e..1f4173a0127e 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -83,6 +83,9 @@ def __init__(self, metrics_export_address, metrics_export_port): ) ) except Exception: + # TODO(SongGuyang): Catch the exception here because there is + # port conflict issue which brought from static port. We should + # remove this after we find better port resolution. logger.exception( "Failed to start prometheus stats exporter. Agent will stay " "alive but disable the stats." diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 0103a5074ca8..6874d7d1aff3 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -343,9 +343,6 @@ RAY_CONFIG(uint64_t, max_pending_lease_requests_per_scheduling_category, 10) /// Wait timeout for dashboard agent register. RAY_CONFIG(uint32_t, agent_register_timeout_ms, 30 * 1000) -/// Whether to fail raylet when agent fails. -RAY_CONFIG(bool, raylet_shares_fate_with_agent, true) - /// If the agent manager fails to communicate with the dashboard agent, we will retry /// after this interval. RAY_CONFIG(uint32_t, agent_manager_retry_interval_ms, 1000); diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index eea944f02d52..aadb19aeff02 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -39,9 +39,9 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ << ", port: " << agent_port_ << ", pid: " << agent_pid_; } else { - RAY_LOG(ERROR) << "The grpc port of agent is invalid (be 0), ip: " - << agent_ip_address_ << ", pid: " << agent_pid_ - << ". Disable the agent client in raylet."; + RAY_LOG(WARNING) << "The grpc port of agent is invalid (be 0), ip: " + << agent_ip_address_ << ", pid: " << agent_pid_ + << ". Disable the agent client in raylet."; disable_agent_client_ = true; } reply->set_status(rpc::AGENT_RPC_STATUS_OK); @@ -103,14 +103,11 @@ void AgentManager::StartAgent() { RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() << " exit, return value " << exit_code << ". ip " << agent_ip_address_ << ". pid " << agent_pid_; - agent_failed_ = true; - if (RayConfig::instance().raylet_shares_fate_with_agent()) { - RAY_LOG(ERROR) << "Raylet exits immediately because the ray agent has failed. " - "Raylet fate shares with the agent. It can happen because the " - "Ray agent is unexpectedly killed or failed. See " - "`dashboard_agent.log` for the root cause."; - QuickExit(); - } + RAY_LOG(ERROR) << "Raylet exits immediately because the ray agent has failed. " + "Raylet fate shares with the agent. It can happen because the " + "Ray agent is unexpectedly killed or failed. See " + "`dashboard_agent.log` for the root cause."; + QuickExit(); }); monitor_thread.detach(); } @@ -141,8 +138,8 @@ void AgentManager::CreateRuntimeEnv( } if (runtime_env_agent_client_ == nullptr) { - // If the agent cannot be restarted anymore, fail the request. - if (disable_agent_client_ || agent_failed_) { + // If the grpc service of agent is invalid, fail the request. + if (disable_agent_client_) { std::stringstream str_stream; str_stream << "Runtime environment " << serialized_runtime_env << " cannot be created on this node because the grpc service of agent " diff --git a/src/ray/raylet/agent_manager.h b/src/ray/raylet/agent_manager.h index ac7542c13e53..ac5c6f4fb9ec 100644 --- a/src/ray/raylet/agent_manager.h +++ b/src/ray/raylet/agent_manager.h @@ -87,8 +87,6 @@ class AgentManager : public rpc::AgentManagerServiceHandler { Options options_; pid_t agent_pid_ = 0; int agent_port_ = 0; - /// The flag indicates whether agent has failed. - std::atomic agent_failed_ = false; /// Whether or not we intend to start the agent. This is false if we /// are missing Ray Dashboard dependencies, for example. bool should_start_agent_ = true; @@ -96,6 +94,8 @@ class AgentManager : public rpc::AgentManagerServiceHandler { DelayExecutorFn delay_executor_; RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory_; std::shared_ptr runtime_env_agent_client_; + /// When the grpc port of agent is invalid, set this flag to indicate that agent client + /// is disable. bool disable_agent_client_ = false; }; From 32ce0e2dded943949e8c2e7b8d80e56c88d6ddad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Sat, 12 Mar 2022 02:58:11 +0800 Subject: [PATCH 06/10] skip tests --- python/ray/tests/test_dashboard.py | 4 ++++ python/ray/tests/test_runtime_env.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index 978232cf5bed..84009ee37cb7 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -159,6 +159,10 @@ def set_agent_failure_env_var(): del os.environ["_RAY_AGENT_FAILING"] +# TODO(SongGuyang): Fail the agent which is in different node from driver. +@pytest.mark.skip( + reason="Agent failure will lead to raylet failure and driver failure." +) def test_dashboard_agent_restart( set_agent_failure_env_var, ray_start_cluster_head, error_pubsub, log_pubsub ): diff --git a/python/ray/tests/test_runtime_env.py b/python/ray/tests/test_runtime_env.py index 4eb06d11c78f..376bbdce97b1 100644 --- a/python/ray/tests/test_runtime_env.py +++ b/python/ray/tests/test_runtime_env.py @@ -301,6 +301,10 @@ def set_agent_failure_env_var(): del os.environ["_RAY_AGENT_FAILING"] +# TODO(SongGuyang): Fail the agent which is in different node from driver. +@pytest.mark.skip( + reason="Agent failure will lead to raylet failure and driver failure." +) @pytest.mark.parametrize("runtime_env_class", [dict, RuntimeEnv]) def test_runtime_env_broken( set_agent_failure_env_var, runtime_env_class, ray_start_cluster_head From 2d2539e2bbd38fd78fa8806ade00020489455962 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Sat, 12 Mar 2022 03:16:35 +0800 Subject: [PATCH 07/10] address comment --- python/ray/tests/test_dashboard.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index 84009ee37cb7..e14e475368c9 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -196,6 +196,8 @@ def matcher(log_batch): def run_tasks_without_runtime_env(): + assert ray.is_initialized() + @ray.remote def f(): pass @@ -206,6 +208,8 @@ def f(): def run_tasks_with_runtime_env(): + assert ray.is_initialized() + @ray.remote(runtime_env={"pip": ["pip-install-test==0.5"]}) def f(): import pip_install_test # noqa @@ -236,7 +240,10 @@ def test_dashboard_agent_grpc_port_conflict(listen_port, call_ray_start): # Tasks without runtime env still work when dashboard agent grpc port conflicts. run_tasks_without_runtime_env() # Tasks with runtime env couldn't work. - with pytest.raises(ray.exceptions.RuntimeEnvSetupError): + with pytest.raises( + ray.exceptions.RuntimeEnvSetupError, + match="the grpc service of agent is invalid", + ): run_tasks_with_runtime_env() From c3f2f53ac44d0010546da6c2a0e9263a8f374b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Mon, 14 Mar 2022 18:21:09 +0800 Subject: [PATCH 08/10] address comments --- python/ray/tests/test_dashboard.py | 35 ------------------ src/ray/core_worker/core_worker_process.cc | 3 +- src/ray/raylet/agent_manager.cc | 43 +++++++++------------- src/ray/raylet/worker_pool.cc | 2 +- 4 files changed, 20 insertions(+), 63 deletions(-) diff --git a/python/ray/tests/test_dashboard.py b/python/ray/tests/test_dashboard.py index e14e475368c9..fd19344040ff 100644 --- a/python/ray/tests/test_dashboard.py +++ b/python/ray/tests/test_dashboard.py @@ -10,8 +10,6 @@ from ray._private.test_utils import ( run_string_as_driver, wait_for_condition, - get_error_message, - get_log_batch, ) import ray @@ -159,39 +157,6 @@ def set_agent_failure_env_var(): del os.environ["_RAY_AGENT_FAILING"] -# TODO(SongGuyang): Fail the agent which is in different node from driver. -@pytest.mark.skip( - reason="Agent failure will lead to raylet failure and driver failure." -) -def test_dashboard_agent_restart( - set_agent_failure_env_var, ray_start_cluster_head, error_pubsub, log_pubsub -): - """Test that when the agent fails to start if the error message - is suppressed correctly without spamming the driver. - """ - # Choose a duplicated port for the agent so that it will crash. - errors = get_error_message( - error_pubsub, 1, ray_constants.DASHBOARD_AGENT_DIED_ERROR, timeout=10 - ) - assert len(errors) == 1 - for e in errors: - assert "Check out the `dashboard_agent.log`" in e.error_message - # Make sure the agent process is not started anymore. - cluster = ray_start_cluster_head - wait_for_condition(lambda: search_agents(cluster) is None) - - # Make sure there's no spammy message for 5 seconds. - def matcher(log_batch): - return log_batch["pid"] == "raylet" and any( - "Agent process with pid" in line for line in log_batch["lines"] - ) - - match = get_log_batch(log_pubsub, 1, timeout=5, matcher=matcher) - assert len(match) == 0, ( - "There are spammy logs during Ray agent restart process. " f"Logs: {match}" - ) - - conflict_port = 34567 diff --git a/src/ray/core_worker/core_worker_process.cc b/src/ray/core_worker/core_worker_process.cc index abc3dde6eb7c..eca59f3bf628 100644 --- a/src/ray/core_worker/core_worker_process.cc +++ b/src/ray/core_worker/core_worker_process.cc @@ -216,7 +216,8 @@ void CoreWorkerProcessImpl::InitializeSystemConfig() { if (status.IsGrpcUnavailable()) { std::ostringstream ss; ss << "Failed to get the system config from raylet because " - << "it is dead. Worker will terminate. Status: " << status; + << "it is dead. Worker will terminate. Status: " << status + << " .Please see `raylet.out` for more details."; if (options_.worker_type == WorkerType::DRIVER) { // If it is the driver, surface the issue to the user. RAY_LOG(ERROR) << ss.str(); diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index e628fa2e8bea..32d25f68757e 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -32,6 +32,7 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, agent_ip_address_ = request.agent_ip_address(); agent_port_ = request.agent_port(); agent_pid_ = request.agent_pid(); + // TODO(SongGuyang): We should remove this after we find better port resolution. // Note: `agent_port_` should be 0 if the grpc port of agent is in conflict. if (agent_port_ != 0) { runtime_env_agent_client_ = @@ -143,17 +144,17 @@ void AgentManager::CreateRuntimeEnv( // If the grpc service of agent is invalid, fail the request. if (disable_agent_client_) { std::stringstream str_stream; - str_stream << "Runtime environment " << serialized_runtime_env - << " cannot be created on this node because the grpc service of agent " - "is invalid."; + str_stream + << "Runtime environment " << serialized_runtime_env + << " cannot be created on this node because the agent client is disabled. You " + "see this error message maybe because the grpc port of agent came into " + "conflict. Please see `dashboard_agent.log` to get more details."; const auto &error_message = str_stream.str(); - RAY_LOG(WARNING) << error_message; + RAY_LOG(ERROR) << error_message; delay_executor_( - [callback = std::move(callback), - serialized_runtime_env = std::move(serialized_runtime_env), - error_message] { + [callback = std::move(callback), error_message] { callback(/*successful=*/false, - /*serialized_runtime_env_context=*/serialized_runtime_env, + /*serialized_runtime_env_context=*/"{}", /*setup_error_message*/ error_message); }, 0); @@ -204,23 +205,11 @@ void AgentManager::CreateRuntimeEnv( } } else { - // TODO(sang): Invoke a callback if it fails more than X times. RAY_LOG(INFO) << "Failed to create the runtime env: " << serialized_runtime_env << ", status = " << status - << ", maybe there are some network problems, will retry it later."; - delay_executor_( - [this, - job_id, - serialized_runtime_env, - serialized_allocated_resource_instances, - callback = std::move(callback)] { - CreateRuntimeEnv(job_id, - serialized_runtime_env, - serialized_allocated_resource_instances, - callback); - }, - RayConfig::instance().agent_manager_retry_interval_ms()); + << ", maybe there are some network problems, will fail the request."; + callback(false, "", "Failed to request agent."); } }); } @@ -228,7 +217,10 @@ void AgentManager::CreateRuntimeEnv( void AgentManager::DeleteURIs(const std::vector &uris, DeleteURIsCallback callback) { if (disable_agent_client_) { - RAY_LOG(ERROR) << "Failed to delete URIs because the agent client is disabled."; + RAY_LOG(ERROR) + << "Failed to delete runtime env URIs because the agent client is disabled. You " + "see this error message maybe because the grpc port of agent came into " + "conflict. Please see `dashboard_agent.log` to get more details."; delay_executor_([callback] { callback(false); }, 0); return; } @@ -259,9 +251,8 @@ void AgentManager::DeleteURIs(const std::vector &uris, RAY_LOG(ERROR) << "Failed to delete URIs" << ", status = " << status - << ", maybe there are some network problems, will retry it later."; - delay_executor_([this, uris, callback] { DeleteURIs(uris, callback); }, - RayConfig::instance().agent_manager_retry_interval_ms()); + << ", maybe there are some network problems, will fail the request."; + callback(false); } }); } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 699fc8e66923..642f25fc5090 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -658,7 +658,7 @@ void WorkerPool::HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job << job_id << ". The result context was " << serialized_runtime_env_context << "."; } else { - RAY_LOG(ERROR) + RAY_LOG(WARNING) << "[Eagerly] Couldn't create a runtime environment for job " << job_id << ". Error message: " << setup_error_message; } From 355d5b219c6769a17cde85ace1b29f1eab218967 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Mon, 14 Mar 2022 23:07:30 +0800 Subject: [PATCH 09/10] fix --- src/ray/raylet/agent_manager.cc | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 32d25f68757e..855af1fb88bb 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -185,9 +185,7 @@ void AgentManager::CreateRuntimeEnv( serialized_allocated_resource_instances); runtime_env_agent_client_->CreateRuntimeEnv( request, - [this, - job_id, - serialized_runtime_env, + [serialized_runtime_env, serialized_allocated_resource_instances, callback = std::move(callback)](const Status &status, const rpc::CreateRuntimeEnvReply &reply) { @@ -236,7 +234,7 @@ void AgentManager::DeleteURIs(const std::vector &uris, request.add_uris(uri); } runtime_env_agent_client_->DeleteURIs( - request, [this, uris, callback](Status status, const rpc::DeleteURIsReply &reply) { + request, [callback](Status status, const rpc::DeleteURIsReply &reply) { if (status.ok()) { if (reply.status() == rpc::AGENT_RPC_STATUS_OK) { callback(true); From 33bfb72f5adb2c0f7ef9ee280b8f876c42362851 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B9=85=E9=BE=99?= Date: Tue, 15 Mar 2022 11:49:03 +0800 Subject: [PATCH 10/10] address comments --- src/ray/raylet/agent_manager.cc | 44 +++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/src/ray/raylet/agent_manager.cc b/src/ray/raylet/agent_manager.cc index 855af1fb88bb..00624e8b0877 100644 --- a/src/ray/raylet/agent_manager.cc +++ b/src/ray/raylet/agent_manager.cc @@ -40,9 +40,9 @@ void AgentManager::HandleRegisterAgent(const rpc::RegisterAgentRequest &request, RAY_LOG(INFO) << "HandleRegisterAgent, ip: " << agent_ip_address_ << ", port: " << agent_port_ << ", pid: " << agent_pid_; } else { - RAY_LOG(WARNING) << "The grpc port of agent is invalid (be 0), ip: " + RAY_LOG(WARNING) << "The GRPC port of the Ray agent is invalid (0), ip: " << agent_ip_address_ << ", pid: " << agent_pid_ - << ". Disable the agent client in raylet."; + << ". The agent client in the raylet has been disabled."; disable_agent_client_ = true; } reply->set_status(rpc::AGENT_RPC_STATUS_OK); @@ -92,8 +92,8 @@ void AgentManager::StartAgent() { [this, child]() mutable { if (agent_pid_ != child.GetId()) { RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() - << " has not registered, restart it. ip " - << agent_ip_address_ << ". pid " << agent_pid_; + << " has not registered. ip " << agent_ip_address_ + << ", pid " << agent_pid_; child.Kill(); } }, @@ -104,10 +104,11 @@ void AgentManager::StartAgent() { RAY_LOG(WARNING) << "Agent process with pid " << child.GetId() << " exit, return value " << exit_code << ". ip " << agent_ip_address_ << ". pid " << agent_pid_; - RAY_LOG(ERROR) << "Raylet exits immediately because the ray agent has failed. " - "Raylet fate shares with the agent. It can happen because the " - "Ray agent is unexpectedly killed or failed. See " - "`dashboard_agent.log` for the root cause."; + RAY_LOG(ERROR) + << "The raylet exited immediately because the Ray agent failed. " + "The raylet fate shares with the agent. This can happen because the " + "Ray agent was unexpectedly killed or failed. See " + "`dashboard_agent.log` for the root cause."; QuickExit(); }); monitor_thread.detach(); @@ -145,10 +146,11 @@ void AgentManager::CreateRuntimeEnv( if (disable_agent_client_) { std::stringstream str_stream; str_stream - << "Runtime environment " << serialized_runtime_env - << " cannot be created on this node because the agent client is disabled. You " - "see this error message maybe because the grpc port of agent came into " - "conflict. Please see `dashboard_agent.log` to get more details."; + << "Failed to create runtime environment " << serialized_runtime_env + << " because the Ray agent couldn't be started due to the port conflict. See " + "`dashboard_agent.log` for more details. To solve the problem, start Ray " + "with a hard-coded agent port. `ray start --dashboard-agent-grpc-port " + "[port]` and make sure the port is not used by other processes."; const auto &error_message = str_stream.str(); RAY_LOG(ERROR) << error_message; delay_executor_( @@ -216,17 +218,20 @@ void AgentManager::DeleteURIs(const std::vector &uris, DeleteURIsCallback callback) { if (disable_agent_client_) { RAY_LOG(ERROR) - << "Failed to delete runtime env URIs because the agent client is disabled. You " - "see this error message maybe because the grpc port of agent came into " - "conflict. Please see `dashboard_agent.log` to get more details."; - delay_executor_([callback] { callback(false); }, 0); + << "Failed to delete runtime environment URI because the Ray agent couldn't be " + "started due to the port conflict. See `dashboard_agent.log` for more " + "details. To solve the problem, start Ray with a hard-coded agent port. `ray " + "start --dashboard-agent-grpc-port [port]` and make sure the port is not used " + "by other processes."; + delay_executor_([callback = std::move(callback)] { callback(false); }, 0); return; } if (runtime_env_agent_client_ == nullptr) { RAY_LOG(INFO) << "Runtime env agent is not registered yet. Will retry DeleteURIs later."; - delay_executor_([this, uris, callback] { DeleteURIs(uris, callback); }, - RayConfig::instance().agent_manager_retry_interval_ms()); + delay_executor_( + [this, uris, callback = std::move(callback)] { DeleteURIs(uris, callback); }, + RayConfig::instance().agent_manager_retry_interval_ms()); return; } rpc::DeleteURIsRequest request; @@ -234,7 +239,8 @@ void AgentManager::DeleteURIs(const std::vector &uris, request.add_uris(uri); } runtime_env_agent_client_->DeleteURIs( - request, [callback](Status status, const rpc::DeleteURIsReply &reply) { + request, + [callback = std::move(callback)](Status status, const rpc::DeleteURIsReply &reply) { if (status.ok()) { if (reply.status() == rpc::AGENT_RPC_STATUS_OK) { callback(true);