Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Shutdown http proxy state #35395

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python/ray/serve/_private/http_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(
self._status = HTTPProxyStatus.STARTING
self._health_check_obj_ref = None
self._last_health_check_time: float = 0
self._shutting_down = False

self._actor_details = HTTPProxyDetails(
node_id=node_id,
Expand Down Expand Up @@ -76,6 +77,9 @@ def update_actor_details(self, **kwargs) -> None:
self._actor_details = HTTPProxyDetails(**details_kwargs)

def update(self):
if self._shutting_down:
return

if self._status == HTTPProxyStatus.STARTING:
try:
finished, _ = ray.wait([self._ready_obj_ref], timeout=0)
Expand All @@ -97,6 +101,7 @@ def update(self):
if finished:
try:
ray.get(finished[0])
print("got object ref!")
self.set_status(HTTPProxyStatus.HEALTHY)
except Exception as e:
logger.warning(
Expand All @@ -121,6 +126,10 @@ def update(self):
self._health_check_obj_ref = self._actor_handle.check_health.remote()
self._last_health_check_time = time.time()

def shutdown(self):
self._shutting_down = True
ray.kill(self.actor_handle, no_restart=True)


class HTTPState:
"""Manages all state for HTTP proxies in the system.
Expand Down Expand Up @@ -157,8 +166,8 @@ def __init__(
self._start_proxies_if_needed()

def shutdown(self) -> None:
for proxy in self.get_http_proxy_handles().values():
ray.kill(proxy, no_restart=True)
for proxy_state in self._proxy_states.values():
proxy_state.shutdown()

def get_config(self):
return self._config
Expand Down
39 changes: 39 additions & 0 deletions python/ray/serve/tests/test_http_state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
from unittest.mock import patch
import asyncio

import pytest

Expand Down Expand Up @@ -132,6 +133,44 @@ def check_proxy(status):
ray.shutdown()


def test_http_proxy_shutdown():
from ray.experimental.state.api import list_actors
ray.init()

@ray.remote(num_cpus=0)
class MockHTTPProxyActor:
async def ready(self):
return json.dumps(["mock_worker_id", "mock_log_file_path"])

async def check_health(self):
await asyncio.sleep(100)

proxy = MockHTTPProxyActor.options(lifetime="detached").remote()
state = HTTPProxyState(proxy, "alice", "mock_node_id", "mock_node_ip")
assert state.status == HTTPProxyStatus.STARTING

def check_proxy(status):
state.update()
return state.status == status

# Proxy actor is ready, so status should transition STARTING -> HEALTHY
wait_for_condition(check_proxy, status=HTTPProxyStatus.HEALTHY, timeout=2)

# Confirm that a new health check has been started
state.update()
assert state._health_check_obj_ref

# Shutdown the http proxy state. Wait for the http proxy actor to be killed
state.shutdown()
wait_for_condition(lambda: len(list_actors(filters=[("state", "=", "ALIVE")])) == 0)

# Make sure that the state doesn't try to check on the status of the dead actor
state.update()
assert state.status == HTTPProxyStatus.HEALTHY

ray.shutdown()


if __name__ == "__main__":
import sys

Expand Down