Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dashboard agent] Catch agent port conflict #23024

Merged
merged 13 commits into from
Mar 15, 2022
85 changes: 48 additions & 37 deletions dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,30 @@ def __init__(
self.ppid = int(os.environ["RAY_RAYLET_PID"])
assert self.ppid > 0
logger.info("Parent pid is %s", self.ppid)
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),))
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
self.server, f"{grpc_ip}:{self.dashboard_agent_port}"
)
logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port)

# Setup raylet channel
options = (("grpc.enable_http_proxy", 0),)
self.aiogrpc_raylet_channel = ray._private.utils.init_grpc_channel(
f"{self.ip}:{self.node_manager_port}", options, asynchronous=True
)

# Setup grpc server
self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0),))
grpc_ip = "127.0.0.1" if self.ip == "127.0.0.1" else "0.0.0.0"
try:
self.grpc_port = ray._private.tls_utils.add_port_to_grpc_server(
self.server, f"{grpc_ip}:{self.dashboard_agent_port}"
)
except Exception:
logger.exception(
"Failed to add port to grpc server. Agent will stay alive but "
"disable the grpc service."
)
self.server = None
self.grpc_port = None
else:
logger.info("Dashboard agent grpc address: %s:%s", grpc_ip, self.grpc_port)

# If the agent is started as non-minimal version, http server should
# be configured to communicate with the dashboard in a head node.
self.http_server = None
Expand Down Expand Up @@ -147,7 +160,8 @@ async def _check_parent():
check_parent_task = create_task(_check_parent())

# Start a grpc asyncio server.
await self.server.start()
if self.server:
await self.server.start()

self.gcs_client = GcsClient(address=self.gcs_address)
modules = self._load_modules()
Expand All @@ -159,7 +173,13 @@ async def _check_parent():
# Http server is not started in the minimal version because
# it requires additional dependencies that are not
# included in the minimal ray package.
self.http_server = await self._configure_http_server(modules)
try:
self.http_server = await self._configure_http_server(modules)
except Exception:
logger.exception(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add TODO here to remove this in the future? (and do better port resolution)

"Failed to start http server. Agent will stay alive but "
"disable the http service."
)

# Write the dashboard agent port to kv.
# TODO: Use async version if performance is an issue
Expand Down Expand Up @@ -361,36 +381,27 @@ async def _check_parent():
except Exception as e:
# All these env vars should be available because
# they are provided by the parent raylet.
restart_count = os.environ["RESTART_COUNT"]
max_restart_count = os.environ["MAX_RESTART_COUNT"]
raylet_pid = os.environ["RAY_RAYLET_PID"]
node_ip = args.node_ip_address
if restart_count >= max_restart_count:
# Agent is failed to be started many times.
# Push an error to all drivers, so that users can know the
# impact of the issue.
gcs_publisher = GcsPublisher(args.gcs_address)
traceback_str = ray._private.utils.format_error_message(
traceback.format_exc()
)
message = (
f"(ip={node_ip}) "
f"The agent on node {platform.uname()[1]} failed to "
f"be restarted {max_restart_count} "
"times. There are 3 possible problems if you see this error."
"\n 1. The dashboard might not display correct "
"information on this node."
"\n 2. Metrics on this node won't be reported."
"\n 3. runtime_env APIs won't work."
"\nCheck out the `dashboard_agent.log` to see the "
"detailed failure messages."
)
ray._private.utils.publish_error_to_driver(
ray_constants.DASHBOARD_AGENT_DIED_ERROR,
message,
redis_client=None,
gcs_publisher=gcs_publisher,
)
logger.error(message)
# Agent is failed to be started many times.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment seems wrong?

# Push an error to all drivers, so that users can know the
# impact of the issue.
redis_client = None
gcs_publisher = GcsPublisher(address=args.gcs_address)

traceback_str = ray._private.utils.format_error_message(traceback.format_exc())
message = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need it? In this case, raylet will just fail right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if we open fate sharing by default, we don't need this. I can remove this.

f"(ip={node_ip}) "
f"The agent on node {platform.uname()[1]} failed to "
"be started. Check out the `dashboard_agent.log` to see the"
"detailed failure messages."
)
ray._private.utils.publish_error_to_driver(
ray_constants.DASHBOARD_AGENT_DIED_ERROR,
message,
redis_client=None,
gcs_publisher=gcs_publisher,
)
logger.error(message)
logger.exception(e)
exit(1)
3 changes: 2 additions & 1 deletion dashboard/modules/reporter/reporter_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,8 @@ async def _perform_iteration(self, publisher):
await asyncio.sleep(reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000)

async def run(self, server):
reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)
if server:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to just not run this function instead of adding this logic? Seems a bit ugly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find better way. The run is a common interface of dashboard agent module. We can do anything else in this function, besides starting grpc server. So, if we don't run this function, we could lose some code paths or features.

reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server)

gcs_addr = self._dashboard_agent.gcs_address
assert gcs_addr is not None
Expand Down
5 changes: 4 additions & 1 deletion dashboard/modules/runtime_env/runtime_env_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,10 @@ async def DeleteURIs(self, request, context):
)

async def run(self, server):
runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server(self, server)
if server:
runtime_env_agent_pb2_grpc.add_RuntimeEnvServiceServicer_to_server(
self, server
)

@staticmethod
def is_minimal_module():
Expand Down
34 changes: 0 additions & 34 deletions dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,6 @@ def test_basic(ray_start_with_dashboard):
raylet_proc_info = all_processes[ray_constants.PROCESS_TYPE_RAYLET][0]
raylet_proc = psutil.Process(raylet_proc_info.process.pid)

# Test for bad imports, the agent should be restarted.
logger.info("Test for bad imports.")
agent_proc = search_agent(raylet_proc.children())
prepare_test_files()
agent_pids = set()
try:
assert agent_proc is not None
agent_proc.kill()
agent_proc.wait()
# The agent will be restarted for imports failure.
for _ in range(300):
agent_proc = search_agent(raylet_proc.children())
if agent_proc:
agent_pids.add(agent_proc.pid)
# The agent should be restarted,
# so we can break if the len(agent_pid) > 1
if len(agent_pids) > 1:
break
time.sleep(0.1)
finally:
cleanup_test_files()
assert len(agent_pids) > 1, agent_pids

agent_proc = search_agent(raylet_proc.children())
if agent_proc:
agent_proc.kill()
agent_proc.wait()

logger.info("Test agent register is OK.")
wait_for_condition(lambda: search_agent(raylet_proc.children()))
assert dashboard_proc.status() in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING]
Expand All @@ -149,11 +121,6 @@ def test_basic(ray_start_with_dashboard):

check_agent_register(raylet_proc, agent_pid)

# The agent should be dead if raylet exits.
raylet_proc.kill()
raylet_proc.wait()
agent_proc.wait(5)

# Check kv keys are set.
logger.info("Check kv keys are set.")
dashboard_address = ray.experimental.internal_kv._internal_kv_get(
Expand All @@ -177,7 +144,6 @@ def test_raylet_and_agent_share_fate(shutdown_only):

system_config = {
"raylet_shares_fate_with_agent": True,
"agent_max_restart_count": 0,
}
ray.init(include_dashboard=True, _system_config=system_config)

Expand Down
17 changes: 14 additions & 3 deletions python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,26 @@ def __init__(self, metrics_export_address, metrics_export_port):
self._lock = threading.Lock()

# Configure exporter. (We currently only support prometheus).
self.view_manager.register_exporter(
prometheus_exporter.new_stats_exporter(
try:
stats_exporter = prometheus_exporter.new_stats_exporter(
prometheus_exporter.Options(
namespace="ray",
port=metrics_export_port,
address=metrics_export_address,
)
)
)
except Exception:
logger.exception(
"Failed to start prometheus stats exporter. Agent will stay "
"alive but disable the stats."
)
self.view_manager = None
else:
self.view_manager.register_exporter(stats_exporter)

def record_reporter_stats(self, records: List[Record]):
if not self.view_manager:
return
with self._lock:
for record in records:
gauge = record.gauge
Expand All @@ -111,6 +120,8 @@ def _record_gauge(self, gauge: Gauge, value: float, tags: dict):

def record_metric_points_from_protobuf(self, metrics: List[Metric]):
"""Record metrics from Opencensus Protobuf"""
if not self.view_manager:
return
with self._lock:
self._record_metrics(metrics)

Expand Down
13 changes: 13 additions & 0 deletions python/ray/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,16 @@ def set_runtime_env_retry_times(request):
yield runtime_env_retry_times
finally:
del os.environ["RUNTIME_ENV_RETRY_TIMES"]


@pytest.fixture
def listen_port(request):
port = getattr(request, "param", 0)
try:
sock = socket.socket()
if hasattr(socket, "SO_REUSEPORT"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 0)
sock.bind(("127.0.0.1", port))
yield port
finally:
sock.close()
Loading