diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 651c4e12c698..b94683f381b8 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -29,7 +29,7 @@ DEFAULT_GRPC_PORT = 9000 #: Default Serve application name -SERVE_DEFAULT_APP_NAME = "" +SERVE_DEFAULT_APP_NAME = "default" #: Separator between app name and deployment name when we prepend #: the app name to each deployment name. This prepending is currently diff --git a/python/ray/serve/_private/deployment_function_node.py b/python/ray/serve/_private/deployment_function_node.py index be47328d80c1..c53f816e7211 100644 --- a/python/ray/serve/_private/deployment_function_node.py +++ b/python/ray/serve/_private/deployment_function_node.py @@ -1,4 +1,3 @@ -import inspect from typing import Any, Callable, Dict, List, Union from ray.dag.dag_node import DAGNode @@ -35,13 +34,6 @@ def __init__( ] deployment_shell = schema_to_deployment(deployment_schema) - # Prefer user specified name to override the generated one. - if ( - inspect.isfunction(func_body) - and deployment_shell.name != func_body.__name__ - ): - self._deployment_name = deployment_shell.name - # Set the route prefix, prefer the one user supplied, # otherwise set it to /deployment_name if ( diff --git a/python/ray/serve/_private/deployment_graph_build.py b/python/ray/serve/_private/deployment_graph_build.py index c75542d0e92a..d06c4a05442f 100644 --- a/python/ray/serve/_private/deployment_graph_build.py +++ b/python/ray/serve/_private/deployment_graph_build.py @@ -66,7 +66,8 @@ def build(ray_dag_root_node: DAGNode, name: str = None) -> List[Deployment]: should be executable via `ray_dag_root_node.execute(user_input)` and should have `InputNode` in it. name: Application name,. If provided, formatting all the deployment name to - {name}_{deployment_name} + {name}_{deployment_name}, if not provided, the deployment name won't be + updated. Returns: deployments: All deployments needed for an e2e runnable serve pipeline, @@ -273,6 +274,16 @@ def replace_with_handle(node): dag_node._body.__annotations__["return"] ) + # Set the deployment name if the user provides. + if "deployment_schema" in dag_node._bound_other_args_to_resolve: + schema = dag_node._bound_other_args_to_resolve["deployment_schema"] + if ( + inspect.isfunction(dag_node._body) + and schema.name != dag_node._body.__name__ + ): + deployment_name = schema.name + + # Update the deployment name if the application name provided. if name: deployment_name = name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment_name diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 82568a02f2fa..28e0e0175f88 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -548,7 +548,7 @@ def run( @PublicAPI(stability="alpha") -def build(target: Application, name: str = SERVE_DEFAULT_APP_NAME) -> BuiltApplication: +def build(target: Application, name: str = None) -> BuiltApplication: """Builds a Serve application into a static, built application. Resolves the provided Application object into a list of deployments. @@ -562,7 +562,8 @@ def build(target: Application, name: str = SERVE_DEFAULT_APP_NAME) -> BuiltAppli Args: target: The Serve application to run consisting of one or more deployments. - name: The name of the Serve application. + name: The name of the Serve application. When name is not provided, the + deployment name won't be updated. (SINGLE_APP use case.) Returns: The static built Serve application. diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 4f1c7e1a7c2e..ee61b8c5f0e2 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -15,6 +15,7 @@ from ray.serve.config import DeploymentMode from ray.serve._private.utils import DEFAULT, dict_keys_snake_to_camel_case from ray.util.annotations import DeveloperAPI, PublicAPI +from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME def _route_prefix_format(cls, v): @@ -304,9 +305,7 @@ class ServeApplicationSchema(BaseModel, extra=Extra.forbid): """ name: str = Field( - # TODO(cindy): eventually we should set the default app name to a non-empty - # string and forbid empty app names. - default="", + default=SERVE_DEFAULT_APP_NAME, description=( "Application name, the name should be unique within the serve instance" ), diff --git a/python/ray/serve/tests/test_advanced.py b/python/ray/serve/tests/test_advanced.py index 495ba9a7cb2f..73c3d6c6f8b2 100644 --- a/python/ray/serve/tests/test_advanced.py +++ b/python/ray/serve/tests/test_advanced.py @@ -6,6 +6,7 @@ import ray from ray import serve from ray._private.test_utils import SignalActor +from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME def test_serve_forceful_shutdown(serve_instance): @@ -16,7 +17,7 @@ def sleeper(): handle = serve.run(sleeper.bind()) ref = handle.remote() - sleeper.delete() + serve.delete(SERVE_DEFAULT_APP_NAME) with pytest.raises(ray.exceptions.RayActorError): ray.get(ref) diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 5bc5e3da2d5b..597a6998723a 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -17,6 +17,10 @@ from ray.serve.drivers import DAGDriver from ray.serve.exceptions import RayServeException from ray.serve._private.api import call_app_builder_with_args_if_necessary +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) @serve.deployment() @@ -423,27 +427,6 @@ def __call__(self, *args): assert ray.get(ingress_handle.remote()) == "got f" -def test_run_delete_old_deployments(serve_instance): - """Check that serve.run() can remove all old deployments""" - - @serve.deployment(name="f", route_prefix="/test1") - def f(): - return "got f" - - @serve.deployment(name="g", route_prefix="/test2") - def g(): - return "got g" - - ingress_handle = serve.run(f.bind()) - assert ray.get(ingress_handle.remote()) == "got f" - - ingress_handle = serve.run(g.bind()) - assert ray.get(ingress_handle.remote()) == "got g" - - assert "g" in serve.list_deployments() - assert "f" not in serve.list_deployments() - - class TestSetOptions: def test_set_options_basic(self): @serve.deployment( @@ -574,7 +557,10 @@ def g(): serve.run(g.bind()) deployment_info = ray.get(controller._all_running_replicas.remote()) - assert "g" in deployment_info + assert ( + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}g" + in deployment_info + ) @serve.deployment def f(): diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 0bdcb4b7b75c..1805c84d718f 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -17,7 +17,11 @@ from ray.serve._private.common import DeploymentInfo from ray.serve._private.common import ReplicaState from ray.serve.config import AutoscalingConfig -from ray.serve._private.constants import CONTROL_LOOP_PERIOD_S +from ray.serve._private.constants import ( + CONTROL_LOOP_PERIOD_S, + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) from ray.serve.controller import ServeController from ray.serve.deployment import Deployment import ray.experimental.state.api as state_api @@ -108,28 +112,38 @@ def test_smoothing_factor(self): assert 5 <= desired_num_replicas <= 8 # 10 + 0.5 * (2.5 - 10) = 6.25 -def get_running_replicas(controller: ServeController, deployment: Deployment) -> List: +def get_running_replicas( + controller: ServeController, deployment: Deployment, app_name +) -> List: """Get the replicas currently running for given deployment""" + if app_name: + deployment_name = app_name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment.name + else: + deployment_name = deployment.name replicas = ray.get( - controller._dump_replica_states_for_testing.remote(deployment.name) + controller._dump_replica_states_for_testing.remote(deployment_name) ) running_replicas = replicas.get([ReplicaState.RUNNING]) return running_replicas def get_running_replica_tags( - controller: ServeController, deployment: Deployment + controller: ServeController, + deployment: Deployment, + app_name: str = SERVE_DEFAULT_APP_NAME, ) -> List: """Get the replica tags of running replicas for given deployment""" - running_replicas = get_running_replicas(controller, deployment) + running_replicas = get_running_replicas(controller, deployment, app_name) return [replica.replica_tag for replica in running_replicas] def get_num_running_replicas( - controller: ServeController, deployment: Deployment + controller: ServeController, + deployment: Deployment, + app_name: str = SERVE_DEFAULT_APP_NAME, ) -> int: """Get the amount of replicas currently running for given deployment""" - running_replicas = get_running_replicas(controller, deployment) + running_replicas = get_running_replicas(controller, deployment, app_name) return len(running_replicas) @@ -167,7 +181,11 @@ def test_assert_no_replicas_deprovisioned(): assert_no_replicas_deprovisioned(replica_tags_2, replica_tags_1) -def get_deployment_start_time(controller: ServeController, deployment: Deployment): +def get_deployment_start_time( + controller: ServeController, + deployment: Deployment, + app_name: str = SERVE_DEFAULT_APP_NAME, +): """Return start time for given deployment""" deployment_route_list = DeploymentRouteList.FromString( ray.get(controller.list_deployments.remote()) @@ -179,7 +197,11 @@ def get_deployment_start_time(controller: ServeController, deployment: Deploymen ) for deployment_route in deployment_route_list.deployment_routes } - deployment_info, _route_prefix = deployments[deployment.name] + if app_name: + deployment_name = app_name + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment.name + else: + deployment_name = deployment.name + deployment_info, _route_prefix = deployments[deployment_name] return deployment_info.start_time_ms @@ -673,7 +695,6 @@ def __call__(self): controller = serve_instance._controller start_time = get_deployment_start_time(controller, A) - A.get_handle() [handle.remote() for _ in range(50)] wait_for_condition( @@ -830,19 +851,19 @@ def __call__(self): print("Deployed A.") controller = serve_instance._controller - start_time = get_deployment_start_time(controller, A) + start_time = get_deployment_start_time(controller, A, app_name=None) - assert get_num_running_replicas(controller, A) == 0 + assert get_num_running_replicas(controller, A, app_name=None) == 0 handle = A.get_handle() [handle.remote() for _ in range(1)] print("Issued one request.") time.sleep(2) - assert get_num_running_replicas(controller, A) == 1 + assert get_num_running_replicas(controller, A, app_name=None) == 1 print("Scale up to 1 replica.") - first_deployment_replicas = get_running_replica_tags(controller, A) + first_deployment_replicas = get_running_replica_tags(controller, A, app_name=None) A.options( autoscaling_config={ @@ -859,14 +880,16 @@ def __call__(self): ).deploy() print("Redeployed A with min_replicas set to 2.") - wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2) + wait_for_condition( + lambda: get_num_running_replicas(controller, A, app_name=None) >= 2 + ) time.sleep(5) # Confirm that autoscaler doesn't scale above 2 even after waiting - assert get_num_running_replicas(controller, A) == 2 + assert get_num_running_replicas(controller, A, app_name=None) == 2 print("Autoscaled to 2 without issuing any new requests.") - second_deployment_replicas = get_running_replica_tags(controller, A) + second_deployment_replicas = get_running_replica_tags(controller, A, app_name=None) # Confirm that none of the original replicas were de-provisioned assert_no_replicas_deprovisioned( @@ -878,12 +901,14 @@ def __call__(self): print("Completed request.") # As the queue is drained, we should scale back down. - wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2) - assert get_num_running_replicas(controller, A) > 1 + wait_for_condition( + lambda: get_num_running_replicas(controller, A, app_name=None) <= 2 + ) + assert get_num_running_replicas(controller, A, app_name=None) > 1 print("Stayed at 2 replicas.") # Make sure start time did not change for the deployment - assert get_deployment_start_time(controller, A) == start_time + assert get_deployment_start_time(controller, A, app_name=None) == start_time @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -903,7 +928,15 @@ def f(): # f should start with initial_replicas (2) deployments actors = state_api.list_actors( - filters=[("class_name", "=", "ServeReplica:f"), ("state", "=", "ALIVE")] + filters=[ + ( + "class_name", + "=", + f"ServeReplica:{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f", + ), + ("state", "=", "ALIVE"), + ] ) print(actors) assert len(actors) == 2 @@ -911,7 +944,15 @@ def f(): # f should scale down to min_replicas (1) deployments def check_one_replica(): actors = state_api.list_actors( - filters=[("class_name", "=", "ServeReplica:f"), ("state", "=", "ALIVE")] + filters=[ + ( + "class_name", + "=", + f"ServeReplica:{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f", + ), + ("state", "=", "ALIVE"), + ] ) return len(actors) == 1 @@ -946,7 +987,12 @@ def scaler(): def check_two_replicas(): actors = state_api.list_actors( filters=[ - ("class_name", "=", "ServeReplica:scaler"), + ( + "class_name", + "=", + f"ServeReplica:{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}scaler", + ), ("state", "=", "ALIVE"), ] ) @@ -969,12 +1015,25 @@ def check_two_replicas(): def check_num_replicas(live: int, dead: int): live_actors = state_api.list_actors( filters=[ - ("class_name", "=", "ServeReplica:scaler"), + ( + "class_name", + "=", + f"ServeReplica:{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}scaler", + ), ("state", "=", "ALIVE"), ] ) dead_actors = state_api.list_actors( - filters=[("class_name", "=", "ServeReplica:scaler"), ("state", "=", "DEAD")] + filters=[ + ( + "class_name", + "=", + f"ServeReplica:{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}scaler", + ), + ("state", "=", "DEAD"), + ] ) return len(live_actors) == live and len(dead_actors) == dead @@ -1063,7 +1122,15 @@ def send_request(): def check_num_replicas(num: int): actors = state_api.list_actors( - filters=[("class_name", "=", "ServeReplica:g"), ("state", "=", "ALIVE")] + filters=[ + ( + "class_name", + "=", + f"ServeReplica:{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}g", + ), + ("state", "=", "ALIVE"), + ] ) return len(actors) == num diff --git a/python/ray/serve/tests/test_cli.py b/python/ray/serve/tests/test_cli.py index 5ec96d7d9d2b..d68a5608f096 100644 --- a/python/ray/serve/tests/test_cli.py +++ b/python/ray/serve/tests/test_cli.py @@ -23,6 +23,10 @@ from ray.tests.conftest import tmp_working_dir # noqa: F401, E501 from ray.dashboard.modules.serve.sdk import ServeSubmissionClient from ray.serve.scripts import convert_args_to_dict, remove_ansi_escape_sequences +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) CONNECTION_ERROR_MSG = "connection error" @@ -107,11 +111,11 @@ def test_deploy(ray_start_stop): print("Deployments are reachable over HTTP.") deployment_names = [ - "DAGDriver", - "create_order", - "Router", - "Multiplier", - "Adder", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}DAGDriver", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}create_order", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Router", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Multiplier", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Adder", ] assert_deployments_live(deployment_names) print("All deployments are live.\n") @@ -135,7 +139,12 @@ def test_deploy(ray_start_stop): ) print("Deployments are reachable over HTTP.") - deployment_names = ["DAGDriver", "Router", "Add", "Subtract"] + deployment_names = [ + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}DAGDriver", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Router", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Add", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Subtract", + ] assert_deployments_live(deployment_names) print("All deployments are live.\n") @@ -479,11 +488,11 @@ def num_live_deployments(): serve_status = yaml.safe_load(status_response) expected_deployments = { - "DAGDriver", - "Multiplier", - "Adder", - "Router", - "create_order", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}DAGDriver", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Multiplier", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Adder", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}Router", + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}create_order", } for status in serve_status["deployment_statuses"]: expected_deployments.remove(status["name"]) diff --git a/python/ray/serve/tests/test_constructor_failure.py b/python/ray/serve/tests/test_constructor_failure.py index 56a592c1bc68..ffed1c5b868f 100644 --- a/python/ray/serve/tests/test_constructor_failure.py +++ b/python/ray/serve/tests/test_constructor_failure.py @@ -6,6 +6,14 @@ import ray from ray import serve +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) + + +def get_deployment_name(name: str): + return f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}{name}" def test_deploy_with_consistent_constructor_failure(serve_instance): @@ -23,8 +31,9 @@ async def serve(self, request): # Assert no replicas are running in deployment deployment after failed # deploy call + deployment_name = get_deployment_name("ConstructorFailureDeploymentOneReplica") deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote()) - assert deployment_dict["ConstructorFailureDeploymentOneReplica"] == [] + assert deployment_dict[deployment_name] == [] # # Test failed to deploy with total of 2 replicas @serve.deployment(num_replicas=2) @@ -40,8 +49,9 @@ async def serve(self, request): # Assert no replicas are running in deployment deployment after failed # deploy call + deployment_name = get_deployment_name("ConstructorFailureDeploymentTwoReplicas") deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote()) - assert deployment_dict["ConstructorFailureDeploymentTwoReplicas"] == [] + assert deployment_dict[deployment_name] == [] def test_deploy_with_partial_constructor_failure(serve_instance): @@ -75,7 +85,8 @@ async def serve(self, request): # Assert 2 replicas are running in deployment deployment after partially # successful deploy call deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote()) - assert len(deployment_dict["PartialConstructorFailureDeployment"]) == 2 + deployment_name = get_deployment_name("PartialConstructorFailureDeployment") + assert len(deployment_dict[deployment_name]) == 2 def test_deploy_with_transient_constructor_failure(serve_instance): @@ -101,7 +112,8 @@ async def serve(self, request): # Assert 2 replicas are running in deployment deployment after partially # successful deploy call with transient error deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote()) - assert len(deployment_dict["TransientConstructorFailureDeployment"]) == 2 + deployment_name = get_deployment_name("TransientConstructorFailureDeployment") + assert len(deployment_dict[deployment_name]) == 2 if __name__ == "__main__": diff --git a/python/ray/serve/tests/test_controller.py b/python/ray/serve/tests/test_controller.py index 1e283cd3fea0..3f3a18077e9d 100644 --- a/python/ray/serve/tests/test_controller.py +++ b/python/ray/serve/tests/test_controller.py @@ -8,6 +8,14 @@ from ray.serve._private.common import DeploymentInfo from ray.serve.generated.serve_pb2 import DeploymentRoute from ray.serve.controller import _generate_deployment_config_versions +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) + + +def get_deployment_name(name: str): + return f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}{name}" def test_redeploy_start_time(serve_instance): @@ -20,8 +28,9 @@ def test(_): return "1" serve.run(test.bind()) + deployment_name = get_deployment_name("test") deployment_route = DeploymentRoute.FromString( - ray.get(controller.get_deployment_info.remote("test")) + ray.get(controller.get_deployment_info.remote(deployment_name)) ) deployment_info_1 = DeploymentInfo.from_proto(deployment_route.deployment_info) start_time_ms_1 = deployment_info_1.start_time_ms @@ -34,7 +43,7 @@ def test(_): serve.run(test.bind()) deployment_route = DeploymentRoute.FromString( - ray.get(controller.get_deployment_info.remote("test")) + ray.get(controller.get_deployment_info.remote(deployment_name)) ) deployment_info_2 = DeploymentInfo.from_proto(deployment_route.deployment_info) start_time_ms_2 = deployment_info_2.start_time_ms diff --git a/python/ray/serve/tests/test_controller_recovery.py b/python/ray/serve/tests/test_controller_recovery.py index 13a4ce7a0d65..77d262c26ee2 100644 --- a/python/ray/serve/tests/test_controller_recovery.py +++ b/python/ray/serve/tests/test_controller_recovery.py @@ -33,7 +33,7 @@ def __init__(self): def __call__(self, *args): return "hii" - serve.run(TransientConstructorFailureDeployment.bind()) + serve.run(TransientConstructorFailureDeployment.bind(), name="app") for _ in range(10): response = request_with_retries( "/recover_start_from_replica_actor_names/", timeout=30 @@ -42,10 +42,10 @@ def __call__(self, *args): # Assert 2 replicas are running in deployment deployment after partially # successful deploy() call with transient error deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote()) - assert len(deployment_dict["recover_start_from_replica_actor_names"]) == 2 + assert len(deployment_dict["app_recover_start_from_replica_actor_names"]) == 2 replica_version_hash = None - for replica in deployment_dict["recover_start_from_replica_actor_names"]: + for replica in deployment_dict["app_recover_start_from_replica_actor_names"]: ref = replica.actor_handle.get_metadata.remote() _, version = ray.get(ref) if replica_version_hash is None: @@ -116,7 +116,7 @@ def test_recover_rolling_update_from_replica_actor_names(serve_instance): @ray.remote(num_cpus=0) def call(block=False): - handle = serve.get_deployment(name).get_handle() + handle = serve.get_deployment(f"app_{name}").get_handle() ret = ray.get(handle.handler.remote(block)) return ret.split("|")[0], ret.split("|")[1] @@ -167,7 +167,7 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1): return responses, blocking - serve.run(V1.bind()) + serve.run(V1.bind(), name="app") responses1, _ = make_nonblocking_calls({"1": 2}, num_returns=2) pids1 = responses1["1"] @@ -182,9 +182,9 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1): # Redeploy new version. Since there is one replica blocking, only one new # replica should be started up. V2 = V1.options(func_or_class=V2, version="2") - serve.run(V2.bind(), _blocking=False) + serve.run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): - client._wait_for_deployment_healthy(V2.name, timeout_s=0.1) + client._wait_for_deployment_healthy(f"app_{V2.name}", timeout_s=0.1) responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True) ray.kill(serve.context._global_client._controller, no_restart=False) @@ -197,7 +197,7 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1): # Now the goal and requests to the new version should complete. # We should have two running replicas of the new version. - client._wait_for_deployment_healthy(V2.name) + client._wait_for_deployment_healthy(f"app_{V2.name}") make_nonblocking_calls({"2": 2}, num_returns=2) @@ -222,7 +222,7 @@ async def __init__(self): def __call__(self, request): return f"1|{os.getpid()}" - serve.run(V1.bind(), _blocking=False) + serve.run(V1.bind(), _blocking=False, name="app") ray.get(pending_init_indicator.remote()) def get_actor_info(name: str): @@ -234,7 +234,7 @@ def get_actor_info(name: str): print(actor) return actor["name"], actor["pid"] - actor_tag, _ = get_actor_info(V1.name) + actor_tag, _ = get_actor_info(f"app_{V1.name}") _, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME) ray.kill(serve.context._global_client._controller, no_restart=False) # wait for controller is alive again @@ -243,9 +243,9 @@ def get_actor_info(name: str): # Let the actor proceed initialization ray.get(signal.send.remote()) - client._wait_for_deployment_healthy(V1.name) + client._wait_for_deployment_healthy(f"app_{V1.name}") # Make sure the actor before controller dead is staying alive. - assert actor_tag == get_actor_info(V1.name)[0] + assert actor_tag == get_actor_info(f"app_{V1.name}")[0] if __name__ == "__main__": diff --git a/python/ray/serve/tests/test_deploy.py b/python/ray/serve/tests/test_deploy.py index d01b597982fd..24b3dc001c41 100644 --- a/python/ray/serve/tests/test_deploy.py +++ b/python/ray/serve/tests/test_deploy.py @@ -12,6 +12,7 @@ from ray import serve from ray.serve.exceptions import RayServeException from ray.serve._private.utils import get_random_letters +from ray.serve.context import get_global_client @pytest.mark.parametrize("use_handle", [True, False]) @@ -210,7 +211,7 @@ def test_redeploy_single_replica(serve_instance, use_handle): @ray.remote def call(block=False): if use_handle: - handle = serve.get_deployment(name).get_handle() + handle = serve.get_deployment(f"app_{name}").get_handle() ret = ray.get(handle.handler.remote(block)) else: ret = requests.get( @@ -241,7 +242,7 @@ async def handler(self, *args): async def __call__(self, request): return await self.handler() - serve.run(V1.bind()) + serve.run(V1.bind(), name="app") ref1 = call.remote(block=False) val1, pid1 = ray.get(ref1) assert val1 == "1" @@ -253,9 +254,9 @@ async def __call__(self, request): # Redeploy new version. This should not go through until the old version # replica completely stops. V2 = V1.options(func_or_class=V2, version="2") - serve.run(V2.bind(), _blocking=False) + serve.run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): - client._wait_for_deployment_healthy(V2.name, timeout_s=0.1) + client._wait_for_deployment_healthy(f"app_{V2.name}", timeout_s=0.1) # It may take some time for the handle change to propagate and requests # to get sent to the new version. Repeatedly send requests until they @@ -283,7 +284,7 @@ async def __call__(self, request): assert pid2 == pid1 # Now the goal and request to the new version should complete. - client._wait_for_deployment_healthy(V2.name) + client._wait_for_deployment_healthy(f"app_{V2.name}") new_version_val, new_version_pid = ray.get(new_version_ref) assert new_version_val == "2" assert new_version_pid != pid2 @@ -301,7 +302,7 @@ def test_redeploy_multiple_replicas(serve_instance, use_handle): @ray.remote(num_cpus=0) def call(block=False): if use_handle: - handle = serve.get_deployment(name).get_handle() + handle = serve.get_deployment(f"app_{name}").get_handle() ret = ray.get(handle.handler.remote(block)) else: ret = requests.get( @@ -355,7 +356,7 @@ def make_nonblocking_calls(expected, expect_blocking=False): return responses, blocking - serve.run(V1.bind()) + serve.run(V1.bind(), name="app") responses1, _ = make_nonblocking_calls({"1": 2}) pids1 = responses1["1"] @@ -368,9 +369,9 @@ def make_nonblocking_calls(expected, expect_blocking=False): # Redeploy new version. Since there is one replica blocking, only one new # replica should be started up. V2 = V1.options(func_or_class=V2, version="2") - serve.run(V2.bind(), _blocking=False) + serve.run(V2.bind(), _blocking=False, name="app") with pytest.raises(TimeoutError): - client._wait_for_deployment_healthy(V2.name, timeout_s=0.1) + client._wait_for_deployment_healthy(f"app_{V2.name}", timeout_s=0.1) responses3, blocking3 = make_nonblocking_calls({"1": 1}, expect_blocking=True) # Signal the original call to exit. @@ -381,7 +382,7 @@ def make_nonblocking_calls(expected, expect_blocking=False): # Now the goal and requests to the new version should complete. # We should have two running replicas of the new version. - client._wait_for_deployment_healthy(V2.name) + client._wait_for_deployment_healthy(f"app_{V2.name}") make_nonblocking_calls({"2": 2}) @@ -511,7 +512,7 @@ def v1(*args): @ray.remote(num_cpus=0) def call(): if use_handle: - handle = v1.get_handle() + handle = get_global_client().get_handle(f"app_{name}", sync=True) ret = ray.get(handle.remote()) else: ret = requests.get(f"http://localhost:8000/{name}").text @@ -536,7 +537,7 @@ def make_calls(expected): return responses - serve.run(v1.bind()) + serve.run(v1.bind(), name="app") responses1 = make_calls({"1": 4}) pids1 = responses1["1"] @@ -544,7 +545,7 @@ def make_calls(expected): def v2(*args): return f"2|{os.getpid()}" - serve.run(v2.bind()) + serve.run(v2.bind(), name="app") responses2 = make_calls({"2": 2}) assert all(pid not in pids1 for pid in responses2["2"]) @@ -562,7 +563,7 @@ def v1(*args): @ray.remote(num_cpus=0) def call(): if use_handle: - handle = v1.get_handle() + handle = get_global_client().get_handle(f"app_{name}", sync=True) ret = ray.get(handle.remote()) else: ret = requests.get(f"http://localhost:8000/{name}").text @@ -587,7 +588,7 @@ def make_calls(expected): return responses - serve.run(v1.bind()) + serve.run(v1.bind(), name="app") responses1 = make_calls({"1": 2}) pids1 = responses1["1"] @@ -595,7 +596,7 @@ def make_calls(expected): def v2(*args): return f"2|{os.getpid()}" - serve.run(v2.bind()) + serve.run(v2.bind(), name="app") responses2 = make_calls({"2": 4}) assert all(pid not in pids1 for pid in responses2["2"]) @@ -606,8 +607,7 @@ class A: def b(self, *args): return "hello" - serve.run(A.bind()) - handle = A.get_handle() + handle = serve.run(A.bind(), name="app") # Legacy code path assert ray.get(handle.options(method_name="b").remote()) == "hello" diff --git a/python/ray/serve/tests/test_deploy_2.py b/python/ray/serve/tests/test_deploy_2.py index 3d0a8156be3a..61449bb618a8 100644 --- a/python/ray/serve/tests/test_deploy_2.py +++ b/python/ray/serve/tests/test_deploy_2.py @@ -16,6 +16,7 @@ class TestGetDeployment: + # Test V1 API get_deployment() def get_deployment(self, name, use_list_api): if use_list_api: return serve.list_deployments()[name] @@ -33,8 +34,8 @@ def d(*args): with pytest.raises(KeyError): self.get_deployment(name, use_list_api) - handle = serve.run(d.bind()) - val1, pid1 = ray.get(handle.remote()) + d.deploy() + val1, pid1 = ray.get(d.get_handle().remote()) assert val1 == "1" del d @@ -52,7 +53,7 @@ def test_get_after_delete(self, serve_instance, use_list_api): def d(*args): return "1", os.getpid() - serve.run(d.bind()) + d.deploy() del d d2 = self.get_deployment(name, use_list_api) @@ -70,15 +71,15 @@ def test_deploy_new_version(self, serve_instance, use_list_api): def d(*args): return "1", os.getpid() - handle = serve.run(d.bind()) - val1, pid1 = ray.get(handle.remote()) + d.deploy() + val1, pid1 = ray.get(d.get_handle().remote()) assert val1 == "1" del d d2 = self.get_deployment(name, use_list_api) - handle = serve.run(d2.options(version="2").bind()) - val2, pid2 = ray.get(handle.remote()) + d2.options(version="2").deploy() + val2, pid2 = ray.get(d2.get_handle().remote()) assert val2 == "1" assert pid2 != pid1 @@ -90,15 +91,15 @@ def test_deploy_empty_version(self, serve_instance, use_list_api): def d(*args): return "1", os.getpid() - handle = serve.run(d.bind()) - val1, pid1 = ray.get(handle.remote()) + d.deploy() + val1, pid1 = ray.get(d.get_handle().remote()) assert val1 == "1" del d d2 = self.get_deployment(name, use_list_api) - handle = serve.run(d2.bind()) - val2, pid2 = ray.get(handle.remote()) + d2.deploy() + val2, pid2 = ray.get(d2.get_handle().remote()) assert val2 == "1" assert pid2 != pid1 @@ -144,12 +145,12 @@ def check_num_replicas(num): handle = self.get_deployment(name, use_list_api).get_handle() assert len(set(ray.get([handle.remote() for _ in range(50)]))) == num - serve.run(d.bind()) + d.deploy() check_num_replicas(1) del d d2 = self.get_deployment(name, use_list_api) - serve.run(d2.options(num_replicas=2).bind()) + d2.options(num_replicas=2).deploy() check_num_replicas(2) diff --git a/python/ray/serve/tests/test_deployment_graph.py b/python/ray/serve/tests/test_deployment_graph.py index e9fa7db68acb..f6633ca1353a 100644 --- a/python/ray/serve/tests/test_deployment_graph.py +++ b/python/ray/serve/tests/test_deployment_graph.py @@ -484,12 +484,13 @@ def get(self): tracker = CallTracker.bind() with InputNode() as inp: - dag = DAGDriver.bind(tracker.predict.bind(inp)) + dag = DAGDriver.bind( + {"/get": tracker.get.bind(), "/predict": tracker.predict.bind(inp)} + ) handle = serve.run(dag) - ray.get(handle.predict.remote(1)) + ray.get(handle.predict_with_route.remote("/predict", 1)) - call_tracker = CallTracker.get_handle() - assert ray.get(call_tracker.get.remote()) == ["predict"] + assert ray.get(handle.predict_with_route.remote("/get", 1)) == ["predict"] def test_sharing_call_for_broadcast(serve_instance): diff --git a/python/ray/serve/tests/test_deployment_graph_autoscaling.py b/python/ray/serve/tests/test_deployment_graph_autoscaling.py index 662f25104bdc..6b0256f0b0c4 100644 --- a/python/ray/serve/tests/test_deployment_graph_autoscaling.py +++ b/python/ray/serve/tests/test_deployment_graph_autoscaling.py @@ -9,12 +9,19 @@ from ray.dag.input_node import InputNode from ray.serve._private.common import ReplicaState from ray._private.test_utils import SignalActor, wait_for_condition +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) # Magic number to use for speed up scale from 0 replica serve_constants.HANDLE_METRIC_PUSH_INTERVAL_S = 1 def get_num_running_replicas(controller, deployment_name): + deployment_name = ( + SERVE_DEFAULT_APP_NAME + DEPLOYMENT_NAME_PREFIX_SEPARATOR + deployment_name + ) replicas = ray.get( controller._dump_replica_states_for_testing.remote(deployment_name) ) diff --git a/python/ray/serve/tests/test_failure.py b/python/ray/serve/tests/test_failure.py index 6f35c5dd4351..98b6282553d2 100644 --- a/python/ray/serve/tests/test_failure.py +++ b/python/ray/serve/tests/test_failure.py @@ -7,6 +7,10 @@ import ray from ray import serve from ray._private.test_utils import wait_for_condition +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) def request_with_retries(endpoint, timeout=30): @@ -107,10 +111,13 @@ def check_new(): def _get_worker_handles(deployment): + deployment_name = ( + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}{deployment}" + ) controller = serve.context._global_client._controller deployment_dict = ray.get(controller._all_running_replicas.remote()) - return [replica.actor_handle for replica in deployment_dict[deployment]] + return [replica.actor_handle for replica in deployment_dict[deployment_name]] # Test that a worker dying unexpectedly causes it to restart and continue diff --git a/python/ray/serve/tests/test_gcs_failure.py b/python/ray/serve/tests/test_gcs_failure.py index 9efe0bb6cce2..c553a9684e2c 100644 --- a/python/ray/serve/tests/test_gcs_failure.py +++ b/python/ray/serve/tests/test_gcs_failure.py @@ -10,6 +10,11 @@ from ray._private.test_utils import wait_for_condition from ray.serve._private.storage.kv_store import KVStoreError, RayInternalKVStore from ray.tests.conftest import external_redis # noqa: F401 +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) +from ray.serve.context import get_global_client @pytest.fixture(scope="function") @@ -58,7 +63,12 @@ def d(*args): def call(): if use_handle: - ret = ray.get(d.get_handle().remote()) + deployment_name = ( + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}d" + ) + ret = ray.get( + get_global_client().get_handle(deployment_name, sync=True).remote() + ) else: ret = requests.get("http://localhost:8000/d").text return ret diff --git a/python/ray/serve/tests/test_get_deployment.py b/python/ray/serve/tests/test_get_deployment.py index bc5f03db8f1f..0406b6695251 100644 --- a/python/ray/serve/tests/test_get_deployment.py +++ b/python/ray/serve/tests/test_get_deployment.py @@ -16,8 +16,8 @@ def d(*args): with pytest.raises(KeyError): serve.get_deployment(name=name) - handle = serve.run(d.bind()) - val1, pid1 = ray.get(handle.remote()) + d.deploy() + val1, pid1 = ray.get(d.get_handle().remote()) assert val1 == "1" del d diff --git a/python/ray/serve/tests/test_grpc.py b/python/ray/serve/tests/test_grpc.py index b5ff870cf57e..69f00886a683 100644 --- a/python/ray/serve/tests/test_grpc.py +++ b/python/ray/serve/tests/test_grpc.py @@ -10,6 +10,11 @@ from ray._private.test_utils import wait_for_condition from ray.serve.exceptions import RayServeException +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) + from unittest.mock import patch @@ -84,7 +89,10 @@ def __call__(self, input): replicas = ray.get( serve.context._global_client._controller._all_running_replicas.remote() ) - assert len(replicas["DefaultgRPCDriver"]) == 1 + deployment_name = ( + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}DefaultgRPCDriver" + ) + assert len(replicas[deployment_name]) == 1 worker_node = cluster.add_node(num_cpus=2) @@ -92,7 +100,7 @@ def __call__(self, input): lambda: len( ray.get( serve.context._global_client._controller._all_running_replicas.remote() - )["DefaultgRPCDriver"] + )[deployment_name] ) == 2 ) @@ -104,7 +112,7 @@ def __call__(self, input): lambda: len( ray.get( serve.context._global_client._controller._all_running_replicas.remote() - )["DefaultgRPCDriver"] + )[deployment_name] ) == 1 ) diff --git a/python/ray/serve/tests/test_handle.py b/python/ray/serve/tests/test_handle.py index 8c62814d9f02..344b418ab11f 100644 --- a/python/ray/serve/tests/test_handle.py +++ b/python/ray/serve/tests/test_handle.py @@ -7,6 +7,11 @@ import ray from ray import serve from ray.serve.exceptions import RayServeException +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) +from ray.serve.context import get_global_client @pytest.mark.asyncio @@ -80,7 +85,10 @@ def f(): handle = serve.run(f.bind()) def thread_get_handle(deploy): - handle = deploy.get_handle(sync=True) + deployment_name = ( + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}{deploy._name}" + ) + handle = get_global_client().get_handle(deployment_name, sync=True) return handle with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index 03c6dbf5fe5c..d0a1fca074dc 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -6,6 +6,10 @@ from ray import serve from ray.serve._private.common import DeploymentStatus from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD +from ray.serve._private.constants import ( + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) class Counter: @@ -219,7 +223,8 @@ def __call__(self, *args): app_status = serve_instance.get_serve_status() assert ( - app_status.deployment_statuses[0].name == "AlwaysUnhealthy" + app_status.deployment_statuses[0].name + == f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}AlwaysUnhealthy" and app_status.deployment_statuses[0].status == DeploymentStatus.UNHEALTHY ) @@ -255,7 +260,8 @@ def __call__(self, *args): def check_status(expected_status: DeploymentStatus): app_status = serve_instance.get_serve_status() return ( - app_status.deployment_statuses[0].name == "WillBeUnhealthy" + app_status.deployment_statuses[0].name == f"{SERVE_DEFAULT_APP_NAME}" + f"{DEPLOYMENT_NAME_PREFIX_SEPARATOR}WillBeUnhealthy" and app_status.deployment_statuses[0].status == expected_status ) diff --git a/python/ray/serve/tests/test_metrics.py b/python/ray/serve/tests/test_metrics.py index 63867945956d..41084f76feba 100644 --- a/python/ray/serve/tests/test_metrics.py +++ b/python/ray/serve/tests/test_metrics.py @@ -134,7 +134,7 @@ async def __call__(self): # Trigger RayActorError os._exit(0) - serve.run(A.bind()) + serve.run(A.bind(), name="app") requests.get("http://127.0.0.1:8000/A/") requests.get("http://127.0.0.1:8000/A/") try: @@ -162,8 +162,8 @@ def verify_error_count(do_assert=False): elif "serve_num_deployment_http_error_requests" in metrics: # deployment A should have error count 2 if do_assert: - assert 'deployment="A"' in metrics and "2.0" in metrics - if 'deployment="A"' not in metrics or "2.0" not in metrics: + assert 'deployment="app_A"' in metrics and "2.0" in metrics + if 'deployment="app_A"' not in metrics or "2.0" not in metrics: return False return True @@ -181,7 +181,7 @@ def test_http_metrics_fields(serve_start_shutdown): def f(*args): return 1 / 0 - serve.run(f.bind()) + serve.run(f.bind(), name="app") # Should generate 404 responses broken_url = "http://127.0.0.1:8000/fake_route" @@ -212,7 +212,7 @@ def f(*args): "serve_num_deployment_http_error_requests" ) assert len(num_deployment_errors) == 1 - assert num_deployment_errors[0]["deployment"] == "f" + assert num_deployment_errors[0]["deployment"] == "app_f" assert num_deployment_errors[0]["error_code"] == "500" assert num_deployment_errors[0]["method"] == "GET" print("serve_num_deployment_http_error_requests working as expected.") @@ -348,7 +348,7 @@ async def app1(self): async def app2(self): return await (await self.handle2.remote()) - serve.run(G.bind(g1.bind(), g2.bind())) + serve.run(G.bind(g1.bind(), g2.bind()), name="app") resp = requests.get("http://127.0.0.1:8000/api") assert resp.text == '"ok1"' resp = requests.get("http://127.0.0.1:8000/api2") @@ -368,9 +368,9 @@ async def app2(self): requests_metrics = self._generate_metrics_summary( get_metric_dictionaries("serve_deployment_request_counter") ) - assert requests_metrics["G"] == {"/api", "/api2"} - assert requests_metrics["g1"] == {"/api"} - assert requests_metrics["g2"] == {"/api2"} + assert requests_metrics["app_G"] == {"/api", "/api2"} + assert requests_metrics["app_g1"] == {"/api"} + assert requests_metrics["app_g2"] == {"/api2"} def test_customer_metrics_with_context(self, serve_start_shutdown): @serve.deployment @@ -562,11 +562,11 @@ def test_actor_summary(serve_instance): def f(): pass - serve.run(f.bind()) + serve.run(f.bind(), name="app") actors = state_api.list_actors(filters=[("state", "=", "ALIVE")]) class_names = {actor["class_name"] for actor in actors} assert class_names.issuperset( - {"ServeController", "HTTPProxyActor", "ServeReplica:f"} + {"ServeController", "HTTPProxyActor", "ServeReplica:app_f"} ) diff --git a/python/ray/serve/tests/test_persistence.py b/python/ray/serve/tests/test_persistence.py index 3ba7d592a58d..c0167fc02067 100644 --- a/python/ray/serve/tests/test_persistence.py +++ b/python/ray/serve/tests/test_persistence.py @@ -14,13 +14,13 @@ def test_new_driver(serve_instance): def driver(): return "OK!" -serve.run(driver.bind()) +serve.run(driver.bind(), name="app") """.format( ray._private.worker._global_node.address ) run_string_as_driver(script) - handle = serve.get_deployment("driver").get_handle() + handle = serve.get_deployment("app_driver").get_handle() assert ray.get(handle.remote()) == "OK!" diff --git a/python/ray/serve/tests/test_regression.py b/python/ray/serve/tests/test_regression.py index 23be1e9d1988..9f636fd58340 100644 --- a/python/ray/serve/tests/test_regression.py +++ b/python/ray/serve/tests/test_regression.py @@ -153,13 +153,13 @@ def test_handle_cache_out_of_scope(serve_instance): def f(): return "hi" - handle = serve.run(f.bind()) + handle = serve.run(f.bind(), name="app") handle_cache = get_global_client().handle_cache assert len(handle_cache) == initial_num_cached + 1 def sender_where_handle_goes_out_of_scope(): - f = serve.get_deployment("f").get_handle() + f = get_global_client().get_handle("app_f", missing_ok=True, sync=True) assert f is handle assert ray.get(f.remote()) == "hi" diff --git a/python/ray/serve/tests/test_runtime_env.py b/python/ray/serve/tests/test_runtime_env.py index 14c8c5e3552c..e6fe295fae4a 100644 --- a/python/ray/serve/tests/test_runtime_env.py +++ b/python/ray/serve/tests/test_runtime_env.py @@ -73,7 +73,7 @@ class Test: def __call__(self, *args): return open("hello").read() -handle = serve.run(Test.bind()) +handle = serve.run(Test.bind(), name="app") assert ray.get(handle.remote()) == "world" """ @@ -87,8 +87,8 @@ def __call__(self, *args): ray.init(address="auto", namespace="serve", job_config=job_config) -Test = serve.get_deployment("Test") -handle = serve.run(Test.bind()) +Test = serve.get_deployment("app_Test") +handle = serve.run(Test.bind(), name="app") assert ray.get(handle.remote()) == "world" Test.delete() """ diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 41004e4abda0..4d2f10616d66 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -186,12 +186,12 @@ def test_single_app_shutdown_actors(ray_shutdown): def f(): pass - serve.run(f.bind()) + serve.run(f.bind(), name="app") actor_names = { "ServeController", "HTTPProxyActor", - "ServeReplica:f", + "ServeReplica:app_f", } def check_alive(): @@ -710,7 +710,7 @@ def check(): return False serve.start(detached=True) - serve.run(hello.bind()) + serve.run(hello.bind(), name="app") check() webui_url = ray_start_with_dashboard["webui_url"] @@ -731,7 +731,7 @@ def verify_snapshot(): snapshot = get_deployment_snapshot() assert len(snapshot) == 1 hello_deployment = list(snapshot.values())[0] - assert hello_deployment["name"] == "hello" + assert hello_deployment["name"] == "app_hello" assert hello_deployment["status"] == "RUNNING" diff --git a/python/ray/serve/tests/test_standalone2.py b/python/ray/serve/tests/test_standalone2.py index b4b483d53ad3..fb6dabd5d4eb 100644 --- a/python/ray/serve/tests/test_standalone2.py +++ b/python/ray/serve/tests/test_standalone2.py @@ -23,7 +23,11 @@ from ray.serve.exceptions import RayServeException from ray.serve._private.client import ServeControllerClient from ray.serve._private.common import ApplicationStatus, DeploymentStatus -from ray.serve._private.constants import SERVE_NAMESPACE +from ray.serve._private.constants import ( + SERVE_NAMESPACE, + SERVE_DEFAULT_APP_NAME, + DEPLOYMENT_NAME_PREFIX_SEPARATOR, +) from ray.serve.context import get_global_client from ray.serve.schema import ( ServeApplicationSchema, @@ -133,9 +137,9 @@ def hello(*args, **kwargs): return "world" ray.init(num_gpus=3, namespace="serve") - serve.run(hello.bind()) + handle = serve.run(hello.bind()) - assert ray.get(hello.get_handle().remote()) == "world" + assert ray.get(handle.remote()) == "world" @pytest.mark.parametrize("detached", [True, False]) @@ -252,7 +256,10 @@ def f(*args): client = get_global_client() status_info_1 = client.get_serve_status() assert status_info_1.app_status.status == "RUNNING" - assert status_info_1.deployment_statuses[0].name == "f" + assert ( + status_info_1.deployment_statuses[0].name + == f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f" + ) assert status_info_1.deployment_statuses[0].status in {"UPDATING", "HEALTHY"} serve.shutdown() @@ -366,14 +373,14 @@ def test_controller_recover_and_delete(shutdown_ray): def f(): pass - f.deploy() + serve.run(f.bind()) actors = list_actors( address=ray_context.address_info["address"], filters=[("state", "=", "ALIVE")] ) - # Try to delete the deployments and kill the controller right after - client.delete_deployments(["f"], blocking=False) + # Try to delete the application and kill the controller right after + serve.delete(SERVE_DEFAULT_APP_NAME, _blocking=False) ray.kill(client._controller, no_restart=False) # All replicas should be removed already or after the controller revives @@ -400,7 +407,12 @@ def f(): # The deployment should be deleted, meaning its state should not be stored # in the DeploymentStateManager. This can be checked by attempting to # retrieve the deployment's status through the controller. - assert client.get_serve_status().get_deployment_status("f") is None + assert ( + client.get_serve_status().get_deployment_status( + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f" + ) + is None + ) serve.shutdown() ray.shutdown() @@ -1067,9 +1079,11 @@ def test_controller_recover_and_deploy(self, client: ServeControllerClient): deployment_timestamp = client.get_serve_status().app_status.deployment_timestamp # Delete all deployments, but don't update config - client.delete_deployments( - ["Router", "Multiplier", "Adder", "create_order", "DAGDriver"] - ) + deployments = [ + f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}{name}" + for name in ["Router", "Multiplier", "Adder", "create_order", "DAGDriver"] + ] + client.delete_deployments(deployments) ray.kill(client._controller, no_restart=False) @@ -1117,11 +1131,12 @@ def test_deploy_config_update( """ def deployment_running(): + name = f"{SERVE_DEFAULT_APP_NAME}{DEPLOYMENT_NAME_PREFIX_SEPARATOR}f" serve_status = client.get_serve_status() return ( - serve_status.get_deployment_status("f") is not None + serve_status.get_deployment_status(name) is not None and serve_status.app_status.status == ApplicationStatus.RUNNING - and serve_status.get_deployment_status("f").status + and serve_status.get_deployment_status(name).status == DeploymentStatus.HEALTHY )