diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 607322c5b9ac..122c27a965b4 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -16,7 +16,7 @@ import ray from ray import cloudpickle from ray._private.utils import get_or_create_event_loop -from ray.actor import ActorClass +from ray.actor import ActorClass, ActorHandle from ray.remote_function import RemoteFunction from ray.serve import metrics from ray.serve._private.common import ( @@ -321,6 +321,9 @@ def _configure_logger_and_profilers( component_id=self._component_id, ) + def push_proxy_handle(self, handle: ActorHandle): + pass + def get_num_ongoing_requests(self) -> int: """Fetch the number of ongoing requests at this replica (queue length). diff --git a/python/ray/serve/_private/replica_scheduler/common.py b/python/ray/serve/_private/replica_scheduler/common.py index 6152d7ea1dd6..68d32cd8f318 100644 --- a/python/ray/serve/_private/replica_scheduler/common.py +++ b/python/ray/serve/_private/replica_scheduler/common.py @@ -8,6 +8,7 @@ import ray from ray import ObjectRef, ObjectRefGenerator +from ray.actor import ActorHandle from ray.serve._private.common import ( ReplicaID, ReplicaQueueLengthInfo, @@ -58,6 +59,10 @@ def max_ongoing_requests(self) -> int: """Max concurrent requests that can be sent to this replica.""" pass + def push_proxy_handle(self, handle: ActorHandle): + """When on proxy, push proxy's self handle to replica""" + pass + async def get_queue_len(self, *, deadline_s: float) -> int: """Returns current queue len for the replica. @@ -120,6 +125,9 @@ def max_ongoing_requests(self) -> int: def is_cross_language(self) -> bool: return self._replica_info.is_cross_language + def push_proxy_handle(self, handle: ActorHandle): + self._actor_handle.push_proxy_handle.remote(handle) + async def get_queue_len(self, *, deadline_s: float) -> int: # NOTE(edoakes): the `get_num_ongoing_requests` method name is shared by # the Python and Java replica implementations. If you change it, you need to diff --git a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py index e509421963bb..85716e6976f3 100644 --- a/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py +++ b/python/ray/serve/_private/replica_scheduler/pow_2_scheduler.py @@ -17,8 +17,10 @@ Tuple, ) +from ray.actor import ActorHandle from ray.exceptions import ActorDiedError, ActorUnavailableError from ray.serve._private.common import ( + DeploymentHandleSource, DeploymentID, ReplicaID, RequestMetadata, @@ -89,19 +91,23 @@ def __init__( self, event_loop: asyncio.AbstractEventLoop, deployment_id: DeploymentID, + handle_source: DeploymentHandleSource, prefer_local_node_routing: bool = False, prefer_local_az_routing: bool = False, self_node_id: Optional[str] = None, self_actor_id: Optional[str] = None, + self_actor_handle: Optional[ActorHandle] = None, self_availability_zone: Optional[str] = None, use_replica_queue_len_cache: bool = False, get_curr_time_s: Optional[Callable[[], float]] = None, ): self._loop = event_loop self._deployment_id = deployment_id + self._handle_source = handle_source self._prefer_local_node_routing = prefer_local_node_routing self._prefer_local_az_routing = prefer_local_az_routing self._self_node_id = self_node_id + self._self_actor_handle = self_actor_handle self._self_availability_zone = self_availability_zone self._use_replica_queue_len_cache = use_replica_queue_len_cache @@ -240,7 +246,17 @@ def update_replicas(self, replicas: List[ReplicaWrapper]): new_replica_id_set = set() new_colocated_replica_ids = defaultdict(set) new_multiplexed_model_id_to_replica_ids = defaultdict(set) + for r in replicas: + # If on the proxy, replica needs to call back into the proxy with + # `receive_asgi_messages` which can be blocked when GCS is down. + # To prevent that from happening, push proxy handle eagerly + if ( + self._handle_source == DeploymentHandleSource.PROXY + and r.replica_id not in self._replicas + ): + r.push_proxy_handle(self._self_actor_handle) + new_replicas[r.replica_id] = r new_replica_id_set.add(r.replica_id) if self._self_node_id is not None and r.node_id == self._self_node_id: @@ -263,6 +279,10 @@ def update_replicas(self, replicas: List[ReplicaWrapper]): extra={"log_to_stderr": False}, ) + # Get list of new replicas + new_ids = new_replica_id_set - self._replica_id_set + replicas_to_ping = [new_replicas.get(id) for id in new_ids] + self._replicas = new_replicas self._replica_id_set = new_replica_id_set self._colocated_replica_ids = new_colocated_replica_ids @@ -272,6 +292,8 @@ def update_replicas(self, replicas: List[ReplicaWrapper]): self._replica_queue_len_cache.remove_inactive_replicas( active_replica_ids=new_replica_id_set ) + # Populate cache for new replicas + self._loop.create_task(self._probe_queue_lens(replicas_to_ping, 0)) self._replicas_updated_event.set() self.maybe_start_scheduling_tasks() diff --git a/python/ray/serve/_private/router.py b/python/ray/serve/_private/router.py index d5ef72fcabdb..02663f7d5891 100644 --- a/python/ray/serve/_private/router.py +++ b/python/ray/serve/_private/router.py @@ -358,10 +358,14 @@ def __init__( replica_scheduler = PowerOfTwoChoicesReplicaScheduler( self._event_loop, deployment_id, + handle_source, _prefer_local_node_routing, RAY_SERVE_PROXY_PREFER_LOCAL_AZ_ROUTING, self_node_id, self_actor_id, + ray.get_runtime_context().current_actor + if ray.get_runtime_context().get_actor_id() + else None, self_availability_zone, use_replica_queue_len_cache=enable_queue_len_cache, ) diff --git a/python/ray/serve/_private/test_utils.py b/python/ray/serve/_private/test_utils.py index 9949eab9e4b1..de99804184c5 100644 --- a/python/ray/serve/_private/test_utils.py +++ b/python/ray/serve/_private/test_utils.py @@ -1,5 +1,6 @@ import asyncio import datetime +import os import threading import time from copy import copy, deepcopy @@ -197,6 +198,15 @@ def __init__( self._soft_target_node_id = _soft_target_node_id +@serve.deployment +class GetPID: + def __call__(self): + return os.getpid() + + +get_pid_entrypoint = GetPID.bind() + + def check_ray_stopped(): try: requests.get("http://localhost:52365/api/ray/version") diff --git a/python/ray/serve/tests/test_gcs_failure.py b/python/ray/serve/tests/test_gcs_failure.py index f7e24443994e..26149af7bcb0 100644 --- a/python/ray/serve/tests/test_gcs_failure.py +++ b/python/ray/serve/tests/test_gcs_failure.py @@ -8,9 +8,13 @@ import ray from ray import serve from ray._private.test_utils import wait_for_condition +from ray.serve._private.common import DeploymentID, ReplicaState from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME from ray.serve._private.storage.kv_store import KVStoreError, RayInternalKVStore +from ray.serve._private.test_utils import check_apps_running, check_replica_counts from ray.serve.context import _get_global_client +from ray.serve.handle import DeploymentHandle +from ray.serve.schema import ServeDeploySchema from ray.tests.conftest import external_redis # noqa: F401 @@ -27,6 +31,8 @@ def serve_ha(external_redis, monkeypatch): # noqa: F811 serve.start() yield (address_info, _get_global_client()) ray.shutdown() + # Clear cache and global serve client + serve.shutdown() @pytest.mark.skipif( @@ -105,6 +111,149 @@ def call(): assert pid == call() +def router_populated_with_replicas(handle: DeploymentHandle, threshold: int = 1): + replicas = handle._router._replica_scheduler._replica_id_set + assert len(replicas) >= threshold + return True + + +@pytest.mark.parametrize("use_proxy", [True, False]) +def test_new_router_on_gcs_failure(serve_ha, use_proxy: bool): + """Test that a new router can send requests to replicas when GCS is down. + + Specifically, if a proxy was just brought up or a deployment handle + was just created, and the GCS goes down BEFORE the router is able to + send its first request, new incoming requests should successfully get + sent to replicas during GCS downtime. + """ + + @serve.deployment + class Dummy: + def __call__(self): + return os.getpid() + + h = serve.run(Dummy.options(num_replicas=2).bind()) + # TODO(zcin): We want to test the behavior for when the router + # didn't get a chance to send even a single request yet. However on + # the very first request we record telemetry for whether the + # deployment handle API was used, which will hang when the GCS is + # down. As a workaround for now, avoid recording telemetry so we + # can properly test router behavior when GCS is down. We should look + # into adding a timeout on the kv cache operation. For now, the proxy + # doesn't run into this because we don't record telemetry on proxy + h._recorded_telemetry = True + # Eagerly create router so it receives the replica set instead of + # waiting for the first request + h._get_or_create_router() + + wait_for_condition(router_populated_with_replicas, handle=h) + + # Kill GCS server before a single request is sent. + ray.worker._global_node.kill_gcs_server() + + returned_pids = set() + if use_proxy: + for _ in range(10): + returned_pids.add( + int(requests.get("http://localhost:8000", timeout=0.1).text) + ) + else: + for _ in range(10): + returned_pids.add(int(h.remote().result(timeout_s=0.1))) + + print("Returned pids:", returned_pids) + assert len(returned_pids) == 2 + + +def test_handle_router_updated_replicas_then_gcs_failure(serve_ha): + """Test the router's replica set is updated from 1 to 2 replicas, with the first + replica staying the same. Verify that if the GCS goes down before the router + gets a chance to send a request to the second replica, requests can be handled + during GCS failure. + + This test uses a plain handle to send requests. + """ + + _, client = serve_ha + + config = { + "name": "default", + "import_path": "ray.serve._private.test_utils:get_pid_entrypoint", + "route_prefix": "/", + "deployments": [{"name": "GetPID", "num_replicas": 1}], + } + client.deploy_apps(ServeDeploySchema(**{"applications": [config]})) + wait_for_condition(check_apps_running, apps=["default"]) + + h = serve.get_app_handle("default") + print(h.remote().result()) + + config["deployments"][0]["num_replicas"] = 2 + client.deploy_apps(ServeDeploySchema(**{"applications": [config]})) + + wait_for_condition(router_populated_with_replicas, handle=h, threshold=2) + + # Kill GCS server before router gets to send request to second replica + ray.worker._global_node.kill_gcs_server() + + returned_pids = set() + for _ in range(10): + returned_pids.add(int(h.remote().result(timeout_s=0.1))) + + print("Returned pids:", returned_pids) + assert len(returned_pids) == 2 + + +def test_proxy_router_updated_replicas_then_gcs_failure(serve_ha): + """Test the router's replica set is updated from 1 to 2 replicas, with the first + replica staying the same. Verify that if the GCS goes down before the router + gets a chance to send a request to the second replica, requests can be handled + during GCS failure. + + This test sends http requests to the proxy. + """ + _, client = serve_ha + + config = { + "name": "default", + "import_path": "ray.serve._private.test_utils:get_pid_entrypoint", + "route_prefix": "/", + "deployments": [{"name": "GetPID", "num_replicas": 1}], + } + client.deploy_apps(ServeDeploySchema(**{"applications": [config]})) + wait_for_condition(check_apps_running, apps=["default"]) + + r = requests.post("http://localhost:8000") + assert r.status_code == 200, r.text + print(r.text) + + config["deployments"][0]["num_replicas"] = 2 + client.deploy_apps(ServeDeploySchema(**{"applications": [config]})) + + # There is no way to directly check if proxy has received updated replicas, + # so just check for the status. After controller updates status with new + # replicas, proxy should instantly receive updates from long poll + wait_for_condition( + check_replica_counts, + controller=client._controller, + deployment_id=DeploymentID("GetPID", "default"), + total=2, + by_state=[(ReplicaState.RUNNING, 2, None)], + ) + + # Kill GCS server before router gets to send request to second replica + ray.worker._global_node.kill_gcs_server() + + returned_pids = set() + for _ in range(10): + r = requests.post("http://localhost:8000") + assert r.status_code == 200 + returned_pids.add(int(r.text)) + + print("Returned pids:", returned_pids) + assert len(returned_pids) == 2 + + if __name__ == "__main__": # When GCS is down, right now some core worker members are not cleared # properly in ray.shutdown. Given that this is not hi-pri issue, diff --git a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py index 0fab2057af0a..d68fe8442ed5 100644 --- a/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py +++ b/python/ray/serve/tests/unit/test_pow_2_replica_scheduler.py @@ -12,8 +12,14 @@ import ray from ray._private.test_utils import async_wait_for_condition from ray._private.utils import get_or_create_event_loop +from ray.actor import ActorHandle from ray.exceptions import ActorDiedError, ActorUnavailableError -from ray.serve._private.common import DeploymentID, ReplicaID, RequestMetadata +from ray.serve._private.common import ( + DeploymentHandleSource, + DeploymentID, + ReplicaID, + RequestMetadata, +) from ray.serve._private.constants import RAY_SERVE_QUEUE_LENGTH_CACHE_TIMEOUT_S from ray.serve._private.replica_scheduler import ( PendingRequest, @@ -88,6 +94,9 @@ def set_queue_len_response( self._exception = exception self._has_queue_len_response.set() + def push_proxy_handle(self, handle: ActorHandle): + pass + async def get_queue_len(self, *, deadline_s: float) -> int: self.num_get_queue_len_calls += 1 self.queue_len_deadline_history.append(deadline_s) @@ -126,10 +135,14 @@ async def construct_scheduler(loop: asyncio.AbstractEventLoop): return PowerOfTwoChoicesReplicaScheduler( loop, DeploymentID(name="TEST_DEPLOYMENT"), + handle_source=request.param.get( + "handle_source", DeploymentHandleSource.REPLICA + ), prefer_local_node_routing=request.param.get("prefer_local_node", False), prefer_local_az_routing=request.param.get("prefer_local_az", False), self_node_id=SCHEDULER_NODE_ID, self_actor_id="fake-actor-id", + self_actor_handle=None, self_availability_zone=request.param.get("az", None), use_replica_queue_len_cache=request.param.get( "use_replica_queue_len_cache", False @@ -481,11 +494,17 @@ async def test_tasks_scheduled_fifo(pow_2_scheduler): # Only a single request will be accepted at a time due to # `reset_after_response=True`. r1 = FakeReplicaWrapper("r1", reset_after_response=True) + r1.set_queue_len_response(0) s.update_replicas([r1]) - for i in range(len(tasks)): + # We need to wait until the initial ping from scheduler to replica + # finishes, which then resets the events in the testing structure + # so that the test can proceed. + await async_wait_for_condition(lambda: not r1._has_queue_len_response.is_set()) + + for _ in range(len(tasks)): r1.set_queue_len_response(0) - done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + done, _ = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) # If the order was not FIFO, the fulfilled assignment may not be the front of # the list. @@ -524,8 +543,14 @@ async def test_retried_tasks_scheduled_fifo(pow_2_scheduler): # Only a single request will be accepted at a time due to # `reset_after_response=True`. r1 = FakeReplicaWrapper("r1", reset_after_response=True) + r1.set_queue_len_response(0) s.update_replicas([r1]) + # We need to wait until the initial ping from scheduler to replica + # finishes, which then resets the events in the testing structure + # so that the test can proceed. + await async_wait_for_condition(lambda: not r1._has_queue_len_response.is_set()) + # Check that the tasks are scheduled in the order they were created (not the. # order they were retried). for expected_idx in range(len(pending_requests)): @@ -1261,9 +1286,19 @@ async def test_tasks_scheduled_fifo_among_model_ids(self, pow_2_scheduler): r2.set_queue_len_response(0) s.update_replicas([r1, r2]) + # We need to wait until the initial ping from scheduler to replica + # finishes, which then resets the events in the testing structure + # so that the test can proceed. + await async_wait_for_condition( + lambda: not r1._has_queue_len_response.is_set(), retry_interval_ms=10 + ) + await async_wait_for_condition( + lambda: not r2._has_queue_len_response.is_set(), retry_interval_ms=10 + ) + # In each iteration, allow one replica of w/ each model ID to be scheduled. # The tasks for each model ID should be scheduled in FIFO order. - for i in range(10): + for _ in range(10): r1.set_queue_len_response(0) r2.set_queue_len_response(0) @@ -1325,11 +1360,14 @@ async def test_queue_len_response_deadline_backoff(pow_2_scheduler): done, _ = await asyncio.wait([task], timeout=0.2) assert len(done) == 0 + # Verify first ping + assert r1.queue_len_deadline_history[0] == 0.001 + # Verify that the deadline never exceeds the max and deadline_n+1 is equal to # the max or 2*deadline_n. for i, j in zip( - range(0, len(r1.queue_len_deadline_history) - 1), - range(1, len(r1.queue_len_deadline_history)), + range(1, len(r1.queue_len_deadline_history) - 1), + range(2, len(r1.queue_len_deadline_history)), ): deadline_i = r1.queue_len_deadline_history[i] deadline_j = r1.queue_len_deadline_history[j] @@ -1476,7 +1514,9 @@ async def test_queue_len_cache_active_probing(pow_2_scheduler): done, _ = await asyncio.wait([task], timeout=0.1) assert len(done) == 1 assert (await task) == r1 - assert len(r1.queue_len_deadline_history) == 0 + # 0 probes from scheduling requests + # + 1 probe from when the replica set was updated with replica r1 + assert len(r1.queue_len_deadline_history) - 1 == 0 # Now time out the entry in the cache -- replica should be probed. TIMER.advance(staleness_timeout_s + 1) @@ -1486,7 +1526,9 @@ async def test_queue_len_cache_active_probing(pow_2_scheduler): done, _ = await asyncio.wait([task], timeout=0.1) assert len(done) == 1 assert (await task) == r1 - assert len(r1.queue_len_deadline_history) == 1 + # 1 probe from scheduling requests + # + 1 probe from when the replica set was updated with replica r1 + assert len(r1.queue_len_deadline_history) - 1 == 1 @pytest.mark.asyncio @@ -1513,7 +1555,9 @@ async def test_queue_len_cache_replica_at_capacity_is_probed(pow_2_scheduler): task = loop.create_task(s.choose_replica_for_request(fake_pending_request())) done, _ = await asyncio.wait([task], timeout=0.1) assert len(done) == 0 - assert len(r1.queue_len_deadline_history) == 1 + # 1 probe from scheduling requests + # + 1 probe from when the replica set was updated with replica r1 + assert len(r1.queue_len_deadline_history) - 1 == 1 # Now let the replica respond and accept the request, it should be scheduled. r1.set_queue_len_response(DEFAULT_MAX_ONGOING_REQUESTS - 1) @@ -1548,14 +1592,18 @@ async def test_queue_len_cache_background_probing(pow_2_scheduler): done, _ = await asyncio.wait([task], timeout=0.1) assert len(done) == 1 assert (await task) == r1 - assert len(r1.queue_len_deadline_history) == 0 + # 0 probes from scheduling requests + # + 1 probe from when the replica set was updated with replica r1 + assert len(r1.queue_len_deadline_history) - 1 == 0 r2.set_queue_len_response(3) def r2_was_probed(): # Check that r2 was probed and the response was added to the cache. + # 1 probe from scheduling requests + # + 1 probe from when the replica set was updated with replica r1 assert ( - len(r2.queue_len_deadline_history) == 1 + len(r2.queue_len_deadline_history) - 1 == 1 and s._replica_queue_len_cache.get(r2.replica_id) == 3 ) return True @@ -1596,8 +1644,10 @@ async def test_queue_len_cache_entries_added_correctly(pow_2_scheduler): else: assert replica in {r1, r2} - assert len(r1.queue_len_deadline_history) == i + 1 - assert len(r2.queue_len_deadline_history) == i + 1 + # i+1 probes from scheduling requests + # + 1 probe from when the replica set was updated with replica r1 + assert len(r1.queue_len_deadline_history) - 1 == i + 1 + assert len(r2.queue_len_deadline_history) - 1 == i + 1 assert s._replica_queue_len_cache.get(r1.replica_id) == r1_queue_len assert s._replica_queue_len_cache.get(r2.replica_id) == r2_queue_len TIMER.advance(staleness_timeout_s + 1)