From a7fbf6fb43f194010d6d03045accb0158c2ed767 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Mon, 22 Apr 2024 07:08:54 -0700 Subject: [PATCH] Make sure dashboard agent will exit if grpc server fails Signed-off-by: Jiajun Yao --- dashboard/agent.py | 37 +++++++++++----------- dashboard/modules/healthz/healthz_agent.py | 2 +- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/dashboard/agent.py b/dashboard/agent.py index f9654eb96589..9a528f59952a 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -85,6 +85,7 @@ def __init__( def _init_non_minimal(self): from ray._private.gcs_pubsub import GcsAioPublisher + from ray.dashboard.http_server_agent import HttpServerAgent self.aio_publisher = GcsAioPublisher(address=self.gcs_address) @@ -137,12 +138,12 @@ def _init_non_minimal(self): else: logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port) - async def _configure_http_server(self, modules): - from ray.dashboard.http_server_agent import HttpServerAgent - - http_server = HttpServerAgent(self.ip, self.listen_port) - await http_server.start(modules) - return http_server + # If the agent is not minimal it should start the http server + # to communicate with the dashboard in a head node. + # Http server is not started in the minimal version because + # it requires additional dependencies that are not + # included in the minimal ray package. + self.http_server = HttpServerAgent(self.ip, self.listen_port) def _load_modules(self): """Load dashboard agent modules.""" @@ -183,15 +184,9 @@ async def run(self): modules = self._load_modules() - # Setup http server if necessary. - if not self.minimal: - # If the agent is not minimal it should start the http server - # to communicate with the dashboard in a head node. - # Http server is not started in the minimal version because - # it requires additional dependencies that are not - # included in the minimal ray package. + if self.http_server: try: - self.http_server = await self._configure_http_server(modules) + await self.http_server.start(modules) except Exception: # TODO(SongGuyang): Catch the exception here because there is # port conflict issue which brought from static port. We should @@ -214,6 +209,7 @@ async def run(self): ) tasks = [m.run(self.server) for m in modules] + if sys.platform not in ["win32", "cygwin"]: def callback(msg): @@ -225,13 +221,18 @@ def callback(msg): self.log_dir, self.gcs_address, callback, loop ) tasks.append(check_parent_task) - await asyncio.gather(*tasks) if self.server: - await self.server.wait_for_termination() + tasks.append(self.server.wait_for_termination()) else: - while True: - await asyncio.sleep(3600) # waits forever + + async def wait_forever(self): + while True: + await asyncio.sleep(3600) + + tasks.append(wait_forever()) + + await asyncio.gather(*tasks) if self.http_server: await self.http_server.cleanup() diff --git a/dashboard/modules/healthz/healthz_agent.py b/dashboard/modules/healthz/healthz_agent.py index 1f61f9a2fddd..5ba8c4063b2c 100644 --- a/dashboard/modules/healthz/healthz_agent.py +++ b/dashboard/modules/healthz/healthz_agent.py @@ -50,4 +50,4 @@ async def run(self, server): @staticmethod def is_minimal_module(): - return True + return False