From d8287b60d8496ab491e8f7ba330aad6e33a365ac Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Tue, 30 Apr 2024 15:03:10 +0800 Subject: [PATCH 1/2] Offload reporter_head CPU heavy JSON parsing to a thread. Signed-off-by: Ruiyang Wang --- dashboard/modules/reporter/reporter_head.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index 0222bffc836d..37b09f3cee3a 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -2,12 +2,11 @@ import logging import asyncio import aiohttp.web +from concurrent.futures import ThreadPoolExecutor from typing import Optional, Tuple, List -import ray -import ray._private.services -import ray._private.utils +from ray._private.utils import get_or_create_event_loop, init_grpc_channel import ray.dashboard.optional_utils as dashboard_optional_utils from ray.dashboard.consts import GCS_RPC_TIMEOUT_SECONDS import ray.dashboard.utils as dashboard_utils @@ -69,6 +68,9 @@ def __init__(self, dashboard_head): ) self._gcs_aio_client = dashboard_head.gcs_aio_client self._state_api = None + self.thread_pool_executor = ThreadPoolExecutor( + max_workers=1, thread_name_prefix="reporter_head_worker" + ) async def _update_stubs(self, change): if change.old: @@ -79,7 +81,7 @@ async def _update_stubs(self, change): node_id, ports = change.new ip = DataSource.node_id_to_ip[node_id] options = GLOBAL_GRPC_OPTIONS - channel = ray._private.utils.init_grpc_channel( + channel = init_grpc_channel( f"{ip}:{ports[1]}", options=options, asynchronous=True ) stub = reporter_pb2_grpc.ReporterServiceStub(channel) @@ -637,9 +639,14 @@ async def run(self, server): key, data = await subscriber.poll() if key is None: continue - data = json.loads(data) + # The JSON Parsing can be CPU heavy. Offload to another thread to avoid + # blocking the event loop. + loop = get_or_create_event_loop() + parsed = await loop.run_in_executor( + self.thread_pool_executor, json.loads, data + ) node_id = key.split(":")[-1] - DataSource.node_physical_stats[node_id] = data + DataSource.node_physical_stats[node_id] = parsed except Exception: logger.exception( "Error receiving node physical stats from reporter agent." From 917c3607db0be31b21a7422a18e50be4af71a19e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Wed, 1 May 2024 06:25:17 +0800 Subject: [PATCH 2/2] rename Signed-off-by: Ruiyang Wang --- dashboard/modules/reporter/reporter_head.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dashboard/modules/reporter/reporter_head.py b/dashboard/modules/reporter/reporter_head.py index 37b09f3cee3a..10bfdb37a109 100644 --- a/dashboard/modules/reporter/reporter_head.py +++ b/dashboard/modules/reporter/reporter_head.py @@ -642,11 +642,11 @@ async def run(self, server): # The JSON Parsing can be CPU heavy. Offload to another thread to avoid # blocking the event loop. loop = get_or_create_event_loop() - parsed = await loop.run_in_executor( + parsed_data = await loop.run_in_executor( self.thread_pool_executor, json.loads, data ) node_id = key.split(":")[-1] - DataSource.node_physical_stats[node_id] = parsed + DataSource.node_physical_stats[node_id] = parsed_data except Exception: logger.exception( "Error receiving node physical stats from reporter agent."