diff --git a/python/ray/serve/_private/application_state.py b/python/ray/serve/_private/application_state.py index a7a11cc7431c..dbd14b381623 100644 --- a/python/ray/serve/_private/application_state.py +++ b/python/ray/serve/_private/application_state.py @@ -61,6 +61,14 @@ class BuildAppStatus(Enum): FAILED = 4 +class APIType(Enum): + """Tracks the type of API that an application originates from.""" + + UNKNOWN = 0 + IMPERATIVE = 1 + DECLARATIVE = 2 + + @dataclass class BuildAppTaskInfo: """Stores info on the current in-progress build app task. @@ -107,6 +115,7 @@ class ApplicationTargetState: target_capacity: Optional[float] target_capacity_direction: Optional[TargetCapacityDirection] deleting: bool + api_type: APIType class ApplicationState: @@ -153,6 +162,7 @@ def __init__( target_capacity=None, target_capacity_direction=None, deleting=False, + api_type=APIType.UNKNOWN, ) self._save_checkpoint_func = save_checkpoint_func @@ -194,6 +204,10 @@ def target_deployments(self) -> List[str]: def ingress_deployment(self) -> Optional[str]: return self._ingress_deployment_name + @property + def api_type(self) -> APIType: + return self._target_state.api_type + def recover_target_state_from_checkpoint( self, checkpoint_data: ApplicationTargetState ): @@ -202,16 +216,19 @@ def recover_target_state_from_checkpoint( ) self._set_target_state( checkpoint_data.deployment_infos, - checkpoint_data.code_version, - checkpoint_data.config, - checkpoint_data.target_capacity, - checkpoint_data.target_capacity_direction, - checkpoint_data.deleting, + api_type=checkpoint_data.api_type, + code_version=checkpoint_data.code_version, + target_config=checkpoint_data.config, + target_capacity=checkpoint_data.target_capacity, + target_capacity_direction=checkpoint_data.target_capacity_direction, + deleting=checkpoint_data.deleting, ) def _set_target_state( self, deployment_infos: Optional[Dict[str, DeploymentInfo]], + *, + api_type: APIType, code_version: str, target_config: Optional[ServeApplicationSchema], target_capacity: Optional[float] = None, @@ -227,7 +244,6 @@ def _set_target_state( When a request to delete the application has been received, this should be ({}, True) """ - if deleting: self._update_status(ApplicationStatus.DELETING) else: @@ -247,6 +263,7 @@ def _set_target_state( target_capacity, target_capacity_direction, deleting, + api_type=api_type, ) # Checkpoint ahead, so that if the controller crashes before we @@ -261,14 +278,30 @@ def _set_target_state_deleting(self): Wipes the target deployment infos, code version, and config. """ - self._set_target_state(dict(), None, None, None, None, True) + self._set_target_state( + deployment_infos=dict(), + api_type=self._target_state.api_type, + code_version=None, + target_config=None, + deleting=True, + ) def _clear_target_state_and_store_config( - self, target_config: Optional[ServeApplicationSchema] + self, + target_config: Optional[ServeApplicationSchema], ): - """Clears the target state and stores the config.""" + """Clears the target state and stores the config. - self._set_target_state(None, None, target_config, None, None, False) + NOTE: this currently assumes that this method is *only* called when managing + apps deployed with the declarative API. + """ + self._set_target_state( + deployment_infos=None, + api_type=APIType.DECLARATIVE, + code_version=None, + target_config=target_config, + deleting=False, + ) def _delete_deployment(self, name): id = DeploymentID(name=name, app_name=self._name) @@ -279,7 +312,7 @@ def delete(self): """Delete the application""" if self._status != ApplicationStatus.DELETING: logger.info( - f"Deleting application '{self._name}'", + f"Deleting app '{self._name}'.", extra={"log_to_stderr": False}, ) self._set_target_state_deleting() @@ -327,11 +360,11 @@ def apply_deployment_info( else: self._endpoint_state.delete_endpoint(deployment_id) - def deploy(self, deployment_infos: Dict[str, DeploymentInfo]): - """Deploy application from list of deployment infos. + def deploy_app(self, deployment_infos: Dict[str, DeploymentInfo]): + """(Re-)deploy the application from list of deployment infos. - This function should only be called if the app is being deployed - through serve.run instead of from a config. + This function should only be called to deploy an app from an + imperative API (i.e., `serve.run` or Java API). Raises: RayServeException if there is more than one route prefix or docs path. @@ -342,25 +375,29 @@ def deploy(self, deployment_infos: Dict[str, DeploymentInfo]): self._set_target_state( deployment_infos=deployment_infos, + api_type=APIType.IMPERATIVE, code_version=None, target_config=None, target_capacity=None, target_capacity_direction=None, ) - def deploy_config( + def apply_app_config( self, config: ServeApplicationSchema, target_capacity: Optional[float], target_capacity_direction: Optional[TargetCapacityDirection], deployment_time: int, ) -> None: - """Deploys an application config. + """Apply the config to the application. If the code version matches that of the current live deployments then it only applies the updated config to the deployment state manager. If the code version doesn't match, this will re-build the application. + + This function should only be called to (re-)deploy an app from + the declarative API (i.e., through the REST API). """ self._deployment_timestamp = deployment_time @@ -377,6 +414,7 @@ def deploy_config( self._set_target_state( # Code version doesn't change. code_version=self._target_state.code_version, + api_type=APIType.DECLARATIVE, # Everything else must reflect the new config. deployment_infos=overrided_infos, target_config=config, @@ -415,7 +453,7 @@ def deploy_config( ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1") # Kick off new build app task - logger.info(f"Building application '{self._name}'.") + logger.info(f"Importing and building app '{self._name}'.") build_app_obj_ref = build_serve_application.options( runtime_env=config.runtime_env, enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS, @@ -517,7 +555,7 @@ def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]: try: args, err = ray.get(self._build_app_task_info.obj_ref) if err is None: - logger.info(f"Built application '{self._name}' successfully.") + logger.info(f"Imported and built app '{self._name}' successfully.") else: return ( None, @@ -658,6 +696,7 @@ def update(self) -> bool: self._set_target_state( deployment_infos=infos, code_version=self._build_app_task_info.code_version, + api_type=self._target_state.api_type, target_config=self._build_app_task_info.config, target_capacity=self._build_app_task_info.target_capacity, target_capacity_direction=( @@ -762,14 +801,14 @@ def _recover_from_checkpoint(self): app_state.recover_target_state_from_checkpoint(checkpoint_data) self._application_states[app_name] = app_state - def delete_application(self, name: str) -> None: + def delete_app(self, name: str) -> None: """Delete application by name""" if name not in self._application_states: return self._application_states[name].delete() - def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None: - """Apply list of deployment arguments to application target state. + def deploy_app(self, name: str, deployment_args: List[Dict]) -> None: + """Deploy the specified app to the list of deployment arguments. This function should only be called if the app is being deployed through serve.run instead of from a config. @@ -816,32 +855,52 @@ def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None: ) for params in deployment_args } - self._application_states[name].deploy(deployment_infos) + self._application_states[name].deploy_app(deployment_infos) - def deploy_config( + def apply_app_configs( self, - name: str, - app_config: ServeApplicationSchema, + app_configs: List[ServeApplicationSchema], + *, deployment_time: float = 0, target_capacity: Optional[float] = None, target_capacity_direction: Optional[TargetCapacityDirection] = None, - ) -> None: - """Deploy application from config.""" + ): + """Declaratively apply the list of application configs. - if name not in self._application_states: - self._application_states[name] = ApplicationState( - name, - self._deployment_state_manager, - endpoint_state=self._endpoint_state, - save_checkpoint_func=self._save_checkpoint_func, + The applications will be reconciled to match the target state of the config. + + Any applications previously deployed declaratively that are *not* present in + the list will be deleted. + """ + for app_config in app_configs: + if app_config.name not in self._application_states: + logger.info(f"Deploying new app '{app_config.name}'.") + self._application_states[app_config.name] = ApplicationState( + app_config.name, + self._deployment_state_manager, + endpoint_state=self._endpoint_state, + save_checkpoint_func=self._save_checkpoint_func, + ) + + self._application_states[app_config.name].apply_app_config( + app_config, + target_capacity, + target_capacity_direction, + deployment_time=deployment_time, ) + + # Delete all apps that were previously deployed via the declarative API + # but are not in the config being applied. + existing_apps = { + name + for name, app_state in self._application_states.items() + if app_state.api_type == APIType.DECLARATIVE + } + apps_in_config = {app_config.name for app_config in app_configs} + for app_to_delete in existing_apps - apps_in_config: + self.delete_app(app_to_delete) + ServeUsageTag.NUM_APPS.record(str(len(self._application_states))) - self._application_states[name].deploy_config( - app_config, - target_capacity, - target_capacity_direction, - deployment_time=deployment_time, - ) def get_deployments(self, app_name: str) -> List[str]: """Return all deployment names by app name""" diff --git a/python/ray/serve/_private/client.py b/python/ray/serve/_private/client.py index c596bc86aad1..a5c6df1c5718 100644 --- a/python/ray/serve/_private/client.py +++ b/python/ray/serve/_private/client.py @@ -303,7 +303,7 @@ def deploy_apps( because a single-app config was deployed after deploying a multi-app config, or vice versa. """ - ray.get(self._controller.deploy_config.remote(config)) + ray.get(self._controller.apply_config.remote(config)) if _blocking: timeout_s = 60 diff --git a/python/ray/serve/_private/controller.py b/python/ray/serve/_private/controller.py index ef07339bee18..34dbd8fd3fbd 100644 --- a/python/ray/serve/_private/controller.py +++ b/python/ray/serve/_private/controller.py @@ -497,7 +497,7 @@ def _recover_state_from_checkpoint(self): logger.info( "Recovered config from checkpoint.", extra={"log_to_stderr": False} ) - self.deploy_config(serve_config, deployment_time=deployment_time) + self.apply_config(serve_config, deployment_time=deployment_time) def _read_config_checkpoint( self, @@ -726,11 +726,9 @@ def deploy_application(self, name: str, deployment_args_list: List[bytes]) -> No else None, } ) - self.application_state_manager.apply_deployment_args( - name, deployment_args_deserialized - ) + self.application_state_manager.deploy_app(name, deployment_args_deserialized) - def deploy_config( + def apply_config( self, config: ServeDeploySchema, deployment_time: float = 0.0, @@ -779,14 +777,6 @@ def deploy_config( app_config_dict = app_config.dict(exclude_unset=True) new_config_checkpoint[app_config.name] = app_config_dict - self.application_state_manager.deploy_config( - app_config.name, - app_config, - deployment_time=deployment_time, - target_capacity=self._target_capacity, - target_capacity_direction=self._target_capacity_direction, - ) - self.kv_store.put( CONFIG_CHECKPOINT_KEY, pickle.dumps( @@ -799,12 +789,15 @@ def deploy_config( ), ) - # Delete live applications not listed in the config. - existing_applications = set( - self.application_state_manager._application_states.keys() + # Declaratively apply the new set of applications. + # This will delete any applications no longer in the config that were + # previously deployed via the REST API. + self.application_state_manager.apply_app_configs( + config.applications, + deployment_time=deployment_time, + target_capacity=self._target_capacity, + target_capacity_direction=self._target_capacity_direction, ) - new_applications = {app_config.name for app_config in config.applications} - self.delete_apps(existing_applications.difference(new_applications)) def get_deployment_info(self, name: str, app_name: str = "") -> bytes: """Get the current information about a deployment. @@ -983,7 +976,7 @@ def delete_apps(self, names: Iterable[str]): During deletion, the application status is DELETING """ for name in names: - self.application_state_manager.delete_application(name) + self.application_state_manager.delete_app(name) def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo): """Record multiplexed model ids for a replica of deployment diff --git a/python/ray/serve/tests/test_controller.py b/python/ray/serve/tests/test_controller.py index 08c1860007e2..2588fedd5f19 100644 --- a/python/ray/serve/tests/test_controller.py +++ b/python/ray/serve/tests/test_controller.py @@ -66,7 +66,7 @@ def test_deploy_app_custom_exception(serve_instance): ] } - ray.get(controller.deploy_config.remote(config=ServeDeploySchema.parse_obj(config))) + ray.get(controller.apply_config.remote(config=ServeDeploySchema.parse_obj(config))) def check_custom_exception() -> bool: status = serve.status().applications["broken_app"] diff --git a/python/ray/serve/tests/test_deploy_app.py b/python/ray/serve/tests/test_deploy_app.py index d9f61dcac8fc..66b683c1edb8 100644 --- a/python/ray/serve/tests/test_deploy_app.py +++ b/python/ray/serve/tests/test_deploy_app.py @@ -30,7 +30,11 @@ check_num_replicas_lte, ) from ray.serve.context import _get_global_client -from ray.serve.schema import ServeDeploySchema, ServeInstanceDetails +from ray.serve.schema import ( + ServeApplicationSchema, + ServeDeploySchema, + ServeInstanceDetails, +) from ray.serve.tests.common.remote_uris import ( TEST_DAG_PINNED_URI, TEST_RUNTIME_ENV_PINNED_URI, @@ -1210,6 +1214,167 @@ def check_deploy_failed(message): wait_for_condition(check_application_running) +def test_deploy_does_not_affect_dynamic_apps(client: ServeControllerClient): + """ + Deploy a set of apps via the declarative API (REST API) and then a dynamic + app via the imperative API (`serve.run`). + + Check that applying a new config via the declarative API does not affect + the app deployed using the imperative API. + """ + + config = ServeDeploySchema( + applications=[ + ServeApplicationSchema( + name="declarative-app-1", + route_prefix="/app-1", + import_path="ray.serve.tests.test_config_files.world.DagNode", + ), + ], + ) + client.deploy_apps(config) + + def check_application_running( + name: str, route_prefix: str, *, msg: str = "wonderful world" + ): + status = serve.status().applications[name] + assert status.status == "RUNNING" + assert requests.post(f"http://localhost:8000{route_prefix}/").text == msg + return True + + wait_for_condition( + check_application_running, name="declarative-app-1", route_prefix="/app-1" + ) + + # Now `serve.run` a dynamic app. + @serve.deployment + class D: + def __call__(self, *args) -> str: + return "Hello!" + + serve.run(D.bind(), name="dynamic-app", route_prefix="/dynamic") + wait_for_condition( + check_application_running, + name="dynamic-app", + route_prefix="/dynamic", + msg="Hello!", + ) + + # Add a new app via declarative API. + # Existing declarative app and dynamic app should not be affected. + config.applications.append( + ServeApplicationSchema( + name="declarative-app-2", + route_prefix="/app-2", + import_path="ray.serve.tests.test_config_files.world.DagNode", + ), + ) + client.deploy_apps(config) + + wait_for_condition( + check_application_running, name="declarative-app-2", route_prefix="/app-2" + ) + wait_for_condition( + check_application_running, name="declarative-app-1", route_prefix="/app-1" + ) + wait_for_condition( + check_application_running, + name="dynamic-app", + route_prefix="/dynamic", + msg="Hello!", + ) + + # Delete one of the apps via declarative API. + # Other declarative app and dynamic app should not be affected. + config.applications.pop(0) + client.deploy_apps(config) + + wait_for_condition( + check_application_running, name="declarative-app-2", route_prefix="/app-2" + ) + wait_for_condition( + check_application_running, + name="dynamic-app", + route_prefix="/dynamic", + msg="Hello!", + ) + + wait_for_condition(lambda: "declarative-app-1" not in serve.status().applications) + + # Now overwrite the declarative app with a dynamic app with the same name. + # On subsequent declarative apply, that app should not be affected. + serve.run(D.bind(), name="declarative-app-2", route_prefix="/app-2") + wait_for_condition( + check_application_running, + name="declarative-app-2", + route_prefix="/app-2", + msg="Hello!", + ) + + config.applications = [ + ServeApplicationSchema( + name="declarative-app-1", + route_prefix="/app-1", + import_path="ray.serve.tests.test_config_files.world.DagNode", + ), + ] + client.deploy_apps(config) + + wait_for_condition( + check_application_running, + name="declarative-app-1", + route_prefix="/app-1", + ) + wait_for_condition( + check_application_running, + name="dynamic-app", + route_prefix="/dynamic", + msg="Hello!", + ) + wait_for_condition( + check_application_running, + name="declarative-app-2", + route_prefix="/app-2", + msg="Hello!", + ) + + # Verify that the controller does not delete the dynamic apps on recovery. + ray.kill(client._controller, no_restart=False) + wait_for_condition( + check_application_running, + name="dynamic-app", + route_prefix="/dynamic", + msg="Hello!", + ) + wait_for_condition( + check_application_running, + name="declarative-app-2", + route_prefix="/app-2", + msg="Hello!", + ) + + # Now overwrite the dynamic app with a declarative one and check that it gets + # deleted upon another apply that doesn't include it. + config.applications = [ + ServeApplicationSchema( + name="declarative-app-2", + route_prefix="/app-2", + import_path="ray.serve.tests.test_config_files.world.DagNode", + ), + ] + client.deploy_apps(config) + wait_for_condition( + check_application_running, + name="declarative-app-2", + route_prefix="/app-2", + ) + + config.applications = [] + client.deploy_apps(config) + + wait_for_condition(lambda: "declarative-app-2" not in serve.status().applications) + + def test_change_route_prefix(client: ServeControllerClient): # Deploy application with route prefix /old app_config = { diff --git a/python/ray/serve/tests/test_standalone_3.py b/python/ray/serve/tests/test_standalone_3.py index e67074481eae..aadfb3a91fdf 100644 --- a/python/ray/serve/tests/test_standalone_3.py +++ b/python/ray/serve/tests/test_standalone_3.py @@ -669,7 +669,7 @@ def __call__(self): with open(os.path.join(get_serve_logs_dir(), filename), "r") as f: all_serve_logs += f.read() assert all_serve_logs.count("Controller shutdown started") == 1 - assert all_serve_logs.count("Deleting application 'default'") == 1 + assert all_serve_logs.count("Deleting app 'default'") == 1 if __name__ == "__main__": diff --git a/python/ray/serve/tests/unit/test_application_state.py b/python/ray/serve/tests/unit/test_application_state.py index 196e47e7bc6d..279bbb833256 100644 --- a/python/ray/serve/tests/unit/test_application_state.py +++ b/python/ray/serve/tests/unit/test_application_state.py @@ -6,6 +6,7 @@ from ray.exceptions import RayTaskError from ray.serve._private.application_state import ( + APIType, ApplicationState, ApplicationStateManager, override_deployment_info, @@ -363,7 +364,7 @@ def test_deploy_and_delete_app(mocked_application_state): # DEPLOY application with deployments {d1, d2} d1_id = DeploymentID(name="d1", app_name="test_app") d2_id = DeploymentID(name="d2", app_name="test_app") - app_state.deploy( + app_state.deploy_app( { "d1": deployment_info("d1", "/hi", "/documentation"), "d2": deployment_info("d2"), @@ -421,7 +422,7 @@ def test_app_deploy_failed_and_redeploy(mocked_application_state): app_state, deployment_state_manager = mocked_application_state d1_id = DeploymentID(name="d1", app_name="test_app") d2_id = DeploymentID(name="d2", app_name="test_app") - app_state.deploy({"d1": deployment_info("d1")}) + app_state.deploy_app({"d1": deployment_info("d1")}) assert app_state.status == ApplicationStatus.DEPLOYING # Before status of deployment changes, app should still be DEPLOYING @@ -440,7 +441,7 @@ def test_app_deploy_failed_and_redeploy(mocked_application_state): assert app_state.status == ApplicationStatus.DEPLOY_FAILED assert app_state._status_msg == deploy_failed_msg - app_state.deploy({"d1": deployment_info("d1"), "d2": deployment_info("d2")}) + app_state.deploy_app({"d1": deployment_info("d1"), "d2": deployment_info("d2")}) assert app_state.status == ApplicationStatus.DEPLOYING assert app_state._status_msg != deploy_failed_msg @@ -472,7 +473,7 @@ def test_app_deploy_failed_and_recover(mocked_application_state): """ app_state, deployment_state_manager = mocked_application_state deployment_id = DeploymentID(name="d1", app_name="test_app") - app_state.deploy({"d1": deployment_info("d1")}) + app_state.deploy_app({"d1": deployment_info("d1")}) assert app_state.status == ApplicationStatus.DEPLOYING # Before status of deployment changes, app should still be DEPLOYING @@ -504,7 +505,7 @@ def test_app_unhealthy(mocked_application_state): id_a, id_b = DeploymentID(name="a", app_name="test_app"), DeploymentID( name="b", app_name="test_app" ) - app_state.deploy({"a": deployment_info("a"), "b": deployment_info("b")}) + app_state.deploy_app({"a": deployment_info("a"), "b": deployment_info("b")}) assert app_state.status == ApplicationStatus.DEPLOYING app_state.update() assert app_state.status == ApplicationStatus.DEPLOYING @@ -532,7 +533,7 @@ def test_app_unhealthy(mocked_application_state): @patch("ray.serve._private.application_state.build_serve_application", Mock()) @patch("ray.get", Mock(return_value=([deployment_params("a", "/old", "/docs")], None))) @patch("ray.serve._private.application_state.check_obj_ref_ready_nowait") -def test_deploy_through_config_succeed(check_obj_ref_ready_nowait): +def test_apply_app_configs_succeed(check_obj_ref_ready_nowait): """Test deploying through config successfully. Deploy obj ref finishes successfully, so status should transition to running. """ @@ -544,8 +545,10 @@ def test_deploy_through_config_succeed(check_obj_ref_ready_nowait): ) # Deploy config - app_config = ServeApplicationSchema(import_path="fa.ke", route_prefix="/new") - app_state_manager.deploy_config(name="test_app", app_config=app_config) + app_config = ServeApplicationSchema( + name="test_app", import_path="fa.ke", route_prefix="/new" + ) + app_state_manager.apply_app_configs([app_config]) app_state = app_state_manager._application_states["test_app"] assert app_state.status == ApplicationStatus.DEPLOYING @@ -578,7 +581,7 @@ def test_deploy_through_config_succeed(check_obj_ref_ready_nowait): @patch("ray.serve._private.application_state.build_serve_application", Mock()) @patch("ray.get", Mock(side_effect=RayTaskError(None, "intentionally failed", None))) @patch("ray.serve._private.application_state.check_obj_ref_ready_nowait") -def test_deploy_through_config_fail(check_obj_ref_ready_nowait): +def test_apply_app_configs_fail(check_obj_ref_ready_nowait): """Test fail to deploy through config. Deploy obj ref errors out, so status should transition to deploy failed. """ @@ -587,8 +590,12 @@ def test_deploy_through_config_fail(check_obj_ref_ready_nowait): app_state_manager = ApplicationStateManager( deployment_state_manager, MockEndpointState(), kv_store ) + # Deploy config - app_state_manager.deploy_config(name="test_app", app_config=Mock()) + app_config = ServeApplicationSchema( + name="test_app", import_path="fa.ke", route_prefix="/new" + ) + app_state_manager.apply_app_configs([app_config]) app_state = app_state_manager._application_states["test_app"] assert app_state.status == ApplicationStatus.DEPLOYING @@ -600,20 +607,83 @@ def test_deploy_through_config_fail(check_obj_ref_ready_nowait): app_state.update() assert app_state.status == ApplicationStatus.DEPLOYING - # Object ref is ready, and the task has called apply_deployment_args + # Object ref is ready, and the task has called deploy_app check_obj_ref_ready_nowait.return_value = True app_state.update() assert app_state.status == ApplicationStatus.DEPLOY_FAILED assert "failed" in app_state._status_msg or "error" in app_state._status_msg +@patch( + "ray.serve._private.application_state.get_app_code_version", + Mock(return_value="123"), +) +@patch("ray.serve._private.application_state.build_serve_application", Mock()) +@patch("ray.get", Mock(return_value=([deployment_params("a", "/old", "/docs")], None))) +@patch("ray.serve._private.application_state.check_obj_ref_ready_nowait") +def test_apply_app_configs_deletes_existing(check_obj_ref_ready_nowait): + """Test that apply_app_configs deletes existing apps that aren't in the new list. + + This should *not* apply to apps that were deployed via `deploy_app` (which is + an imperative API). + """ + kv_store = MockKVStore() + deployment_state_manager = MockDeploymentStateManager(kv_store) + app_state_manager = ApplicationStateManager( + deployment_state_manager, MockEndpointState(), kv_store + ) + + # Deploy an app via `deploy_app` - should not be affected. + a_id = DeploymentID(name="a", app_name="imperative_app") + app_state_manager.deploy_app("imperative_app", [deployment_params("a", "/hi")]) + imperative_app_state = app_state_manager._application_states["imperative_app"] + assert imperative_app_state.api_type == APIType.IMPERATIVE + assert imperative_app_state.status == ApplicationStatus.DEPLOYING + + imperative_app_state.update() + deployment_state_manager.set_deployment_healthy(a_id) + imperative_app_state.update() + assert imperative_app_state.status == ApplicationStatus.RUNNING + + # Now deploy an initial version of the config with app 1 and app 2. + app1_config = ServeApplicationSchema( + name="app1", import_path="fa.ke", route_prefix="/1" + ) + app2_config = ServeApplicationSchema( + name="app2", import_path="fa.ke", route_prefix="/2" + ) + app_state_manager.apply_app_configs([app1_config, app2_config]) + app1_state = app_state_manager._application_states["app1"] + assert app1_state.api_type == APIType.DECLARATIVE + app2_state = app_state_manager._application_states["app2"] + assert app2_state.api_type == APIType.DECLARATIVE + app1_state.update() + app2_state.update() + assert app1_state.status == ApplicationStatus.DEPLOYING + assert app2_state.status == ApplicationStatus.DEPLOYING + + # Now redeploy a new config that removes app 1 and adds app 3. + app3_config = ServeApplicationSchema( + name="app3", import_path="fa.ke", route_prefix="/3" + ) + app_state_manager.apply_app_configs([app3_config, app2_config]) + app3_state = app_state_manager._application_states["app3"] + assert app3_state.api_type == APIType.DECLARATIVE + app1_state.update() + app2_state.update() + app3_state.update() + assert app1_state.status == ApplicationStatus.DELETING + assert app2_state.status == ApplicationStatus.DEPLOYING + assert app3_state.status == ApplicationStatus.DEPLOYING + + def test_redeploy_same_app(mocked_application_state): """Test redeploying same application with updated deployments.""" app_state, deployment_state_manager = mocked_application_state a_id = DeploymentID(name="a", app_name="test_app") b_id = DeploymentID(name="b", app_name="test_app") c_id = DeploymentID(name="c", app_name="test_app") - app_state.deploy({"a": deployment_info("a"), "b": deployment_info("b")}) + app_state.deploy_app({"a": deployment_info("a"), "b": deployment_info("b")}) assert app_state.status == ApplicationStatus.DEPLOYING # Update @@ -630,7 +700,7 @@ def test_redeploy_same_app(mocked_application_state): assert app_state.status == ApplicationStatus.RUNNING # Deploy the same app with different deployments - app_state.deploy({"b": deployment_info("b"), "c": deployment_info("c")}) + app_state.deploy_app({"b": deployment_info("b"), "c": deployment_info("c")}) assert app_state.status == ApplicationStatus.DEPLOYING # Target state should be updated immediately assert "a" not in app_state.target_deployments @@ -654,9 +724,9 @@ def test_deploy_with_route_prefix_conflict(mocked_application_state_manager): """Test that an application with a route prefix conflict fails to deploy""" app_state_manager, _, _ = mocked_application_state_manager - app_state_manager.apply_deployment_args("app1", [deployment_params("a", "/hi")]) + app_state_manager.deploy_app("app1", [deployment_params("a", "/hi")]) with pytest.raises(RayServeException): - app_state_manager.apply_deployment_args("app2", [deployment_params("b", "/hi")]) + app_state_manager.deploy_app("app2", [deployment_params("b", "/hi")]) def test_deploy_with_renamed_app(mocked_application_state_manager): @@ -670,7 +740,7 @@ def test_deploy_with_renamed_app(mocked_application_state_manager): ) # deploy app1 - app_state_manager.apply_deployment_args("app1", [deployment_params("a", "/url1")]) + app_state_manager.deploy_app("app1", [deployment_params("a", "/url1")]) app_state = app_state_manager._application_states["app1"] assert app_state_manager.get_app_status("app1") == ApplicationStatus.DEPLOYING @@ -685,12 +755,12 @@ def test_deploy_with_renamed_app(mocked_application_state_manager): assert app_state_manager.get_app_status("app1") == ApplicationStatus.RUNNING # delete app1 - app_state_manager.delete_application("app1") + app_state_manager.delete_app("app1") assert app_state_manager.get_app_status("app1") == ApplicationStatus.DELETING app_state_manager.update() # deploy app2 - app_state_manager.apply_deployment_args("app2", [deployment_params("b", "/url1")]) + app_state_manager.deploy_app("app2", [deployment_params("b", "/url1")]) assert app_state_manager.get_app_status("app2") == ApplicationStatus.DEPLOYING app_state_manager.update() @@ -719,7 +789,7 @@ def test_application_state_recovery(mocked_application_state_manager): # DEPLOY application with deployments {d1, d2} params = deployment_params("d1") - app_state_manager.apply_deployment_args(app_name, [params]) + app_state_manager.deploy_app(app_name, [params]) app_state = app_state_manager._application_states[app_name] assert app_state.status == ApplicationStatus.DEPLOYING @@ -768,7 +838,7 @@ def test_recover_during_update(mocked_application_state_manager): # DEPLOY application with deployment "d1" params = deployment_params("d1") - app_state_manager.apply_deployment_args(app_name, [params]) + app_state_manager.deploy_app(app_name, [params]) app_state = app_state_manager._application_states[app_name] assert app_state.status == ApplicationStatus.DEPLOYING @@ -781,7 +851,7 @@ def test_recover_during_update(mocked_application_state_manager): # Deploy new version of "d1" (this auto generates new random version) params2 = deployment_params("d1") - app_state_manager.apply_deployment_args(app_name, [params2]) + app_state_manager.deploy_app(app_name, [params2]) assert app_state.status == ApplicationStatus.DEPLOYING # Before application state manager could propagate new version to @@ -832,7 +902,7 @@ def test_is_ready_for_shutdown(mocked_application_state_manager): # DEPLOY application with deployment "d1" params = deployment_params(deployment_name) - app_state_manager.apply_deployment_args(app_name, [params]) + app_state_manager.deploy_app(app_name, [params]) app_state = app_state_manager._application_states[app_name] assert app_state.status == ApplicationStatus.DEPLOYING