From 19e55994654d1ebd220712e910e692a5c485c6ba Mon Sep 17 00:00:00 2001 From: zcin Date: Fri, 5 Aug 2022 15:30:24 -0700 Subject: [PATCH] [serve] Update version if import_path or runtime_env in config is changed (#27498) (#27561) --- python/ray/serve/controller.py | 37 +++++++++----- .../ray/serve/tests/test_config_files/pid.py | 11 +++++ python/ray/serve/tests/test_controller.py | 49 +++++++++++++++++-- python/ray/serve/tests/test_standalone2.py | 43 ++++++++++------ 4 files changed, 110 insertions(+), 30 deletions(-) diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index adf8e098d5af..96745544e067 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -412,13 +412,14 @@ def deploy_app( # Compare new config options with old ones and set versions of new deployments config_checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY) + if config_checkpoint is not None: _, last_config_dict, last_version_dict = pickle.loads(config_checkpoint) - updated_version_dict = _generate_new_version_config( + updated_version_dict = _generate_deployment_config_versions( config_dict, last_config_dict, last_version_dict ) else: - updated_version_dict = _generate_new_version_config(config_dict, {}, {}) + updated_version_dict = _generate_deployment_config_versions(config_dict) self.kv_store.put( CONFIG_CHECKPOINT_KEY, @@ -572,21 +573,25 @@ def get_app_config(self) -> Dict: return config -def _generate_new_version_config( - new_config: Dict, last_deployed_config: Dict, last_deployed_versions: Dict +def _generate_deployment_config_versions( + new_config: Dict, + last_deployed_config: Dict = None, + last_deployed_versions: Dict = None, ) -> Dict[str, str]: """ - This function determines whether each deployment's version - should be changed based on the updated options. - - When a deployment's options change, its version should generally change, - so old replicas are torn down. The only options which can be changed - without tearing down replicas (i.e. changing the version) are: + This function determines whether each deployment's version should be changed based + on the newly deployed config. + + When ``import_path`` or ``runtime_env`` is changed, the versions for all deployments + should be changed, so old replicas are torn down. When the options for a deployment + in ``deployments`` change, its version should generally change. The only deployment + options that can be changed without tearing down replicas (i.e. changing the + version) are: * num_replicas * user_config * autoscaling_config - An option is considered changed when: + A deployment option is considered changed when: * it was not specified in last_deployed_config and is specified in new_config * it was specified in last_deployed_config and is not specified in new_config * it is specified in both last_deployed_config and new_config but the specified @@ -604,6 +609,16 @@ def _generate_new_version_config( Dictionary of {deployment_name: str -> version: str} containing updated versions for deployments listed in the new config """ + # If import_path or runtime_env is changed, it is considered a code change + if last_deployed_config is None: + last_deployed_config = {} + if last_deployed_versions is None: + last_deployed_versions = {} + + if last_deployed_config.get("import_path") != new_config.get( + "import_path" + ) or last_deployed_config.get("runtime_env") != new_config.get("runtime_env"): + last_deployed_config, last_deployed_versions = {}, {} new_deployments = {d["name"]: d for d in new_config.get("deployments", [])} old_deployments = { diff --git a/python/ray/serve/tests/test_config_files/pid.py b/python/ray/serve/tests/test_config_files/pid.py index b9e86f3f4ba7..9b30c9de510d 100644 --- a/python/ray/serve/tests/test_config_files/pid.py +++ b/python/ray/serve/tests/test_config_files/pid.py @@ -1,4 +1,5 @@ from ray import serve +from ray.serve.deployment_graph import RayServeDAGHandle import os @@ -14,4 +15,14 @@ async def __call__(self): return os.getpid() +@serve.deployment +class BasicDriver: + def __init__(self, dag: RayServeDAGHandle): + self.dag = dag + + async def __call__(self): + return await self.dag.remote() + + node = f.bind() +bnode = BasicDriver.bind(node) diff --git a/python/ray/serve/tests/test_controller.py b/python/ray/serve/tests/test_controller.py index a25c304a4917..1e283cd3fea0 100644 --- a/python/ray/serve/tests/test_controller.py +++ b/python/ray/serve/tests/test_controller.py @@ -7,7 +7,7 @@ from ray import serve from ray.serve._private.common import DeploymentInfo from ray.serve.generated.serve_pb2 import DeploymentRoute -from ray.serve.controller import _generate_new_version_config +from ray.serve.controller import _generate_deployment_config_versions def test_redeploy_start_time(serve_instance): @@ -52,10 +52,13 @@ def test(_): ("ray_actor_options", False), ], ) -def test_generate_new_version_config( +def test_config_versions_deployments_update( last_config_had_option: bool, option_to_update: str, config_update: bool ): - """Check that controller._generate_new_version_config() has correct behavior.""" + """ + Check that controller._generate_deployment_config_versions() has correct behavior + when the config options in the ``deployments`` field is updated. + """ options = { "num_replicas": {"old": 1, "new": 2}, @@ -87,13 +90,51 @@ def test_generate_new_version_config( new_config["deployments"][0][option_to_update] = options[option_to_update]["new"] versions = {"f": "v1"} - new_versions = _generate_new_version_config(new_config, old_config, versions) + new_versions = _generate_deployment_config_versions( + new_config, old_config, versions + ) assert ( new_versions.get("f") is not None and (new_versions.get("f") == versions.get("f")) == config_update ) +@pytest.mark.parametrize("field_to_update", ["import_path", "runtime_env", "both"]) +def test_config_versions_non_deployments_update(field_to_update: str): + """ + Check that controller._generate_deployment_config_versions() has correct behavior + when the the ``import_path`` and ``runtime_env`` fields are updated. + """ + + old_config = { + "import_path": "ray.serve.tests.test_config_files.pid.node", + "deployments": [ + { + "name": "f", + "num_replicas": 1, + "ray_actor_options": {"num_cpus": 0.1}, + } + ], + } + + new_config = copy.deepcopy(old_config) + if field_to_update == "import_path": + new_config["import_path"] = "ray.serve.tests.test_config_files.pid.bnode" + elif field_to_update == "runtime_env": + new_config["runtime_env"] = {"env_vars": {"test_var": "test_val"}} + elif field_to_update == "both": + new_config["import_path"] = "ray.serve.tests.test_config_files.pid.bnode" + new_config["runtime_env"] = {"env_vars": {"test_var": "test_val"}} + + versions = {"f": "v1"} + new_versions = _generate_deployment_config_versions( + new_config, old_config, versions + ) + assert new_versions.get("f") is not None and ( + new_versions.get("f") != versions.get("f") + ) + + if __name__ == "__main__": import sys diff --git a/python/ray/serve/tests/test_standalone2.py b/python/ray/serve/tests/test_standalone2.py index f7fe5bd962ba..110ddf51e65e 100644 --- a/python/ray/serve/tests/test_standalone2.py +++ b/python/ray/serve/tests/test_standalone2.py @@ -563,16 +563,22 @@ def test_controller_recover_and_deploy(self, client: ServeControllerClient): assert client.get_serve_status().app_status.deployment_timestamp == 0 @pytest.mark.parametrize( - "option_to_update,config_update", + "field_to_update,option_to_update,config_update", [ - ("num_replicas", True), - ("autoscaling_config", True), - ("user_config", True), - ("ray_actor_options", False), + ("import_path", "", False), + ("runtime_env", "", False), + ("deployments", "num_replicas", True), + ("deployments", "autoscaling_config", True), + ("deployments", "user_config", True), + ("deployments", "ray_actor_options", False), ], ) def test_deploy_config_update( - self, client: ServeControllerClient, option_to_update: str, config_update: bool + self, + client: ServeControllerClient, + field_to_update: str, + option_to_update: str, + config_update: bool, ): """ Check that replicas stay alive when lightweight config updates are made and @@ -604,15 +610,22 @@ def deployment_running(): wait_for_condition(deployment_running, timeout=15) pid1 = requests.get("http://localhost:8000/f").text - updated_options = { - "num_replicas": 2, - "autoscaling_config": {"max_replicas": 2}, - "user_config": {"name": "bob"}, - "ray_actor_options": {"num_cpus": 0.2}, - } - config_template["deployments"][0][option_to_update] = updated_options[ - option_to_update - ] + if field_to_update == "import_path": + config_template[ + "import_path" + ] = "ray.serve.tests.test_config_files.pid.bnode" + elif field_to_update == "runtime_env": + config_template["runtime_env"] = {"env_vars": {"test_var": "test_val"}} + elif field_to_update == "deployments": + updated_options = { + "num_replicas": 2, + "autoscaling_config": {"max_replicas": 2}, + "user_config": {"name": "bob"}, + "ray_actor_options": {"num_cpus": 0.2}, + } + config_template["deployments"][0][option_to_update] = updated_options[ + option_to_update + ] client.deploy_app(ServeApplicationSchema.parse_obj(config_template)) wait_for_condition(deployment_running, timeout=15)