Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dashboard agent] Catch agent port conflict #23024

Merged
merged 13 commits into from
Mar 15, 2022
82 changes: 36 additions & 46 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import logging
import logging.handlers
import os
import platform
import sys
import json
import traceback

try:
from grpc import aio as aiogrpc
Expand All @@ -20,7 +18,6 @@
import ray.ray_constants as ray_constants
import ray._private.services
import ray._private.utils
from ray._private.gcs_pubsub import GcsPublisher
from ray._private.gcs_utils import GcsClient
from ray.core.generated import agent_manager_pb2
from ray.core.generated import agent_manager_pb2_grpc
Expand Down Expand Up @@ -84,17 +81,33 @@ def __init__(
self.ppid = int(os.environ["RAY_RAYLET_PID"])
assert self.ppid > 0
logger.info("Parent pid is %s", self.ppid)
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),))
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
self.server, f"{grpc_ip}:{self.dashboard_agent_port}"
)
logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port)

# Setup raylet channel
options = (("grpc.enable_http_proxy", 0),)
self.aiogrpc_raylet_channel = ray._private.utils.init_grpc_channel(
f"{self.ip}:{self.node_manager_port}", options, asynchronous=True
)

# Setup grpc server
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),))
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
try:
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
self.server, f"{grpc_ip}:{self.dashboard_agent_port}"
)
except Exception:
# TODO(SongGuyang): Catch the exception here because there is
# port conflict issue which brought from static port. We should
# remove this after we find better port resolution.
logger.exception(
"Failed to add port to grpc server. Agent will stay alive but "
"disable the grpc service."
)
self.server = None
self.grpc_port = None
else:
logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port)

# If the agent is started as non-minimal version, http server should
# be configured to communicate with the dashboard in a head node.
self.http_server = None
Expand Down Expand Up @@ -147,7 +160,8 @@ async def _check_parent():
check_parent_task = create_task(_check_parent())

# Start a grpc asyncio server.
await self.server.start()
if self.server:
await self.server.start()

self.gcs_client = GcsClient(address=self.gcs_address)
modules = self._load_modules()
Expand All @@ -159,7 +173,16 @@ async def _check_parent():
# Http server is not started in the minimal version because
# it requires additional dependencies that are not
# included in the minimal ray package.
self.http_server = await self._configure_http_server(modules)
try:
self.http_server = await self._configure_http_server(modules)
except Exception:
# TODO(SongGuyang): Catch the exception here because there is
# port conflict issue which brought from static port. We should
# remove this after we find better port resolution.
logger.exception(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add TODO here to remove this in the future? (and do better port resolution)

"Failed to start http server. Agent will stay alive but "
"disable the http service."
)

# Write the dashboard agent port to kv.
# TODO: Use async version if performance is an issue
Expand Down Expand Up @@ -358,39 +381,6 @@ async def _check_parent():

loop = asyncio.get_event_loop()
loop.run_until_complete(agent.run())
except Exception as e:
# All these env vars should be available because
# they are provided by the parent raylet.
restart_count = os.environ["RESTART_COUNT"]
max_restart_count = os.environ["MAX_RESTART_COUNT"]
raylet_pid = os.environ["RAY_RAYLET_PID"]
node_ip = args.node_ip_address
if restart_count >= max_restart_count:
# Agent is failed to be started many times.
# Push an error to all drivers, so that users can know the
# impact of the issue.
gcs_publisher = GcsPublisher(args.gcs_address)
traceback_str = ray._private.utils.format_error_message(
traceback.format_exc()
)
message = (
f"(ip={node_ip}) "
f"The agent on node {platform.uname()[1]} failed to "
f"be restarted {max_restart_count} "
"times. There are 3 possible problems if you see this error."
"\n 1. The dashboard might not display correct "
"information on this node."
"\n 2. Metrics on this node won't be reported."
"\n 3. runtime_env APIs won't work."
"\nCheck out the `dashboard_agent.log` to see the "
"detailed failure messages."
)
ray._private.utils.publish_error_to_driver(
ray_constants.DASHBOARD_AGENT_DIED_ERROR,
message,
redis_client=None,
gcs_publisher=gcs_publisher,
)
logger.error(message)
logger.exception(e)
except Exception:
logger.exception("Agent is working abnormally. It will exit immediately.")
exit(1)
3 changes: 2 additions & 1 deletion dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ async def _perform_iteration(self, publisher):
await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)

async def run(self, server):
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)
if server:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to just not run this function instead of adding this logic? Seems a bit ugly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find better way. The run is a common interface of dashboard agent module. We can do anything else in this function, besides starting grpc server. So, if we don't run this function, we could lose some code paths or features.

reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)

gcs_addr = self._dashboard_agent.gcs_address
assert gcs_addr is not None
Expand Down
5 changes: 4 additions & 1 deletion dashboard/modules/runtime_env/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ async def DeleteURIs(self, request, context):
)

async def run(self, server):
runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server(self, server)
if server:
runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server(
self, server
)

@staticmethod
def is_minimal_module():
Expand Down
41 changes: 2 additions & 39 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,6 @@ def test_basic(ray_start_with_dashboard):
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)

# Test for bad imports, the agent should be restarted.
logger.info("Test for bad imports.")
agent_proc = search_agent(raylet_proc.children())
prepare_test_files()
agent_pids = set()
try:
assert agent_proc is not None
agent_proc.kill()
agent_proc.wait()
# The agent will be restarted for imports failure.
for _ in range(300):
agent_proc = search_agent(raylet_proc.children())
if agent_proc:
agent_pids.add(agent_proc.pid)
# The agent should be restarted,
# so we can break if the len(agent_pid) > 1
if len(agent_pids) > 1:
break
time.sleep(0.1)
finally:
cleanup_test_files()
assert len(agent_pids) > 1, agent_pids

agent_proc = search_agent(raylet_proc.children())
if agent_proc:
agent_proc.kill()
agent_proc.wait()

logger.info("Test agent register is OK.")
wait_for_condition(lambda: search_agent(raylet_proc.children()))
assert dashboard_proc.status() in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING]
Expand All @@ -149,11 +121,6 @@ def test_basic(ray_start_with_dashboard):

check_agent_register(raylet_proc, agent_pid)

# The agent should be dead if raylet exits.
raylet_proc.kill()
raylet_proc.wait()
agent_proc.wait(5)

# Check kv keys are set.
logger.info("Check kv keys are set.")
dashboard_address = ray.experimental.internal_kv._internal_kv_get(
Expand All @@ -175,11 +142,7 @@ def test_basic(ray_start_with_dashboard):
def test_raylet_and_agent_share_fate(shutdown_only):
"""Test raylet and agent share fate."""

system_config = {
"raylet_shares_fate_with_agent": True,
"agent_max_restart_count": 0,
}
ray.init(include_dashboard=True, _system_config=system_config)
ray.init(include_dashboard=True)

all_processes = ray.worker._global_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
Expand All @@ -198,7 +161,7 @@ def test_raylet_and_agent_share_fate(shutdown_only):

ray.shutdown()

ray.init(include_dashboard=True, _system_config=system_config)
ray.init(include_dashboard=True)
all_processes = ray.worker._global_node.all_processes
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)
Expand Down
20 changes: 17 additions & 3 deletions python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,29 @@ def __init__(self, metrics_export_address, metrics_export_port):
self._lock = threading.Lock()

# Configure exporter. (We currently only support prometheus).
self.view_manager.register_exporter(
prometheus_exporter.new_stats_exporter(
try:
stats_exporter = prometheus_exporter.new_stats_exporter(
prometheus_exporter.Options(
namespace="ray",
port=metrics_export_port,
address=metrics_export_address,
)
)
)
except Exception:
# TODO(SongGuyang): Catch the exception here because there is
# port conflict issue which brought from static port. We should
# remove this after we find better port resolution.
logger.exception(
"Failed to start prometheus stats exporter. Agent will stay "
"alive but disable the stats."
)
self.view_manager = None
else:
self.view_manager.register_exporter(stats_exporter)

def record_reporter_stats(self, records: List[Record]):
if not self.view_manager:
return
with self._lock:
for record in records:
gauge = record.gauge
Expand All @@ -111,6 +123,8 @@ def _record_gauge(self, gauge: Gauge, value: float, tags: dict):

def record_metric_points_from_protobuf(self, metrics: List[Metric]):
"""Record metrics from Opencensus Protobuf"""
if not self.view_manager:
return
with self._lock:
self._record_metrics(metrics)

Expand Down
13 changes: 13 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,16 @@ def set_runtime_env_retry_times(request):
yield runtime_env_retry_times
finally:
del os.environ["RUNTIME_ENV_RETRY_TIMES"]


@pytest.fixture
def listen_port(request):
port = getattr(request, "param", 0)
try:
sock = socket.socket()
if hasattr(socket, "SO_REUSEPORT"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 0)
sock.bind(("127.0.0.1", port))
yield port
finally:
sock.close()
Loading