Skip to content

Commit

Permalink
[serve] Update version if import_path or runtime_env in config is cha…
Browse files Browse the repository at this point in the history
…nged (#27498) (#27561)
  • Loading branch information
zcin authored Aug 5, 2022
1 parent 6b0bd42 commit 19e5599
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 30 deletions.
37 changes: 26 additions & 11 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,14 @@ def deploy_app(

# Compare new config options with old ones and set versions of new deployments
config_checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY)

if config_checkpoint is not None:
_, last_config_dict, last_version_dict = pickle.loads(config_checkpoint)
updated_version_dict = _generate_new_version_config(
updated_version_dict = _generate_deployment_config_versions(
config_dict, last_config_dict, last_version_dict
)
else:
updated_version_dict = _generate_new_version_config(config_dict, {}, {})
updated_version_dict = _generate_deployment_config_versions(config_dict)

self.kv_store.put(
CONFIG_CHECKPOINT_KEY,
Expand Down Expand Up @@ -572,21 +573,25 @@ def get_app_config(self) -> Dict:
return config


def _generate_new_version_config(
new_config: Dict, last_deployed_config: Dict, last_deployed_versions: Dict
def _generate_deployment_config_versions(
new_config: Dict,
last_deployed_config: Dict = None,
last_deployed_versions: Dict = None,
) -> Dict[str, str]:
"""
This function determines whether each deployment's version
should be changed based on the updated options.
When a deployment's options change, its version should generally change,
so old replicas are torn down. The only options which can be changed
without tearing down replicas (i.e. changing the version) are:
This function determines whether each deployment's version should be changed based
on the newly deployed config.
When ``import_path`` or ``runtime_env`` is changed, the versions for all deployments
should be changed, so old replicas are torn down. When the options for a deployment
in ``deployments`` change, its version should generally change. The only deployment
options that can be changed without tearing down replicas (i.e. changing the
version) are:
* num_replicas
* user_config
* autoscaling_config
An option is considered changed when:
A deployment option is considered changed when:
* it was not specified in last_deployed_config and is specified in new_config
* it was specified in last_deployed_config and is not specified in new_config
* it is specified in both last_deployed_config and new_config but the specified
Expand All @@ -604,6 +609,16 @@ def _generate_new_version_config(
Dictionary of {deployment_name: str -> version: str} containing updated
versions for deployments listed in the new config
"""
# If import_path or runtime_env is changed, it is considered a code change
if last_deployed_config is None:
last_deployed_config = {}
if last_deployed_versions is None:
last_deployed_versions = {}

if last_deployed_config.get("import_path") != new_config.get(
"import_path"
) or last_deployed_config.get("runtime_env") != new_config.get("runtime_env"):
last_deployed_config, last_deployed_versions = {}, {}

new_deployments = {d["name"]: d for d in new_config.get("deployments", [])}
old_deployments = {
Expand Down
11 changes: 11 additions & 0 deletions python/ray/serve/tests/test_config_files/pid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ray import serve
from ray.serve.deployment_graph import RayServeDAGHandle
import os


Expand All @@ -14,4 +15,14 @@ async def __call__(self):
return os.getpid()


@serve.deployment
class BasicDriver:
def __init__(self, dag: RayServeDAGHandle):
self.dag = dag

async def __call__(self):
return await self.dag.remote()


node = f.bind()
bnode = BasicDriver.bind(node)
49 changes: 45 additions & 4 deletions python/ray/serve/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray import serve
from ray.serve._private.common import DeploymentInfo
from ray.serve.generated.serve_pb2 import DeploymentRoute
from ray.serve.controller import _generate_new_version_config
from ray.serve.controller import _generate_deployment_config_versions


def test_redeploy_start_time(serve_instance):
Expand Down Expand Up @@ -52,10 +52,13 @@ def test(_):
("ray_actor_options", False),
],
)
def test_generate_new_version_config(
def test_config_versions_deployments_update(
last_config_had_option: bool, option_to_update: str, config_update: bool
):
"""Check that controller._generate_new_version_config() has correct behavior."""
"""
Check that controller._generate_deployment_config_versions() has correct behavior
when the config options in the ``deployments`` field is updated.
"""

options = {
"num_replicas": {"old": 1, "new": 2},
Expand Down Expand Up @@ -87,13 +90,51 @@ def test_generate_new_version_config(
new_config["deployments"][0][option_to_update] = options[option_to_update]["new"]

versions = {"f": "v1"}
new_versions = _generate_new_version_config(new_config, old_config, versions)
new_versions = _generate_deployment_config_versions(
new_config, old_config, versions
)
assert (
new_versions.get("f") is not None
and (new_versions.get("f") == versions.get("f")) == config_update
)


@pytest.mark.parametrize("field_to_update", ["import_path", "runtime_env", "both"])
def test_config_versions_non_deployments_update(field_to_update: str):
"""
Check that controller._generate_deployment_config_versions() has correct behavior
when the the ``import_path`` and ``runtime_env`` fields are updated.
"""

old_config = {
"import_path": "ray.serve.tests.test_config_files.pid.node",
"deployments": [
{
"name": "f",
"num_replicas": 1,
"ray_actor_options": {"num_cpus": 0.1},
}
],
}

new_config = copy.deepcopy(old_config)
if field_to_update == "import_path":
new_config["import_path"] = "ray.serve.tests.test_config_files.pid.bnode"
elif field_to_update == "runtime_env":
new_config["runtime_env"] = {"env_vars": {"test_var": "test_val"}}
elif field_to_update == "both":
new_config["import_path"] = "ray.serve.tests.test_config_files.pid.bnode"
new_config["runtime_env"] = {"env_vars": {"test_var": "test_val"}}

versions = {"f": "v1"}
new_versions = _generate_deployment_config_versions(
new_config, old_config, versions
)
assert new_versions.get("f") is not None and (
new_versions.get("f") != versions.get("f")
)


if __name__ == "__main__":
import sys

Expand Down
43 changes: 28 additions & 15 deletions python/ray/serve/tests/test_standalone2.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,16 +563,22 @@ def test_controller_recover_and_deploy(self, client: ServeControllerClient):
assert client.get_serve_status().app_status.deployment_timestamp == 0

@pytest.mark.parametrize(
"option_to_update,config_update",
"field_to_update,option_to_update,config_update",
[
("num_replicas", True),
("autoscaling_config", True),
("user_config", True),
("ray_actor_options", False),
("import_path", "", False),
("runtime_env", "", False),
("deployments", "num_replicas", True),
("deployments", "autoscaling_config", True),
("deployments", "user_config", True),
("deployments", "ray_actor_options", False),
],
)
def test_deploy_config_update(
self, client: ServeControllerClient, option_to_update: str, config_update: bool
self,
client: ServeControllerClient,
field_to_update: str,
option_to_update: str,
config_update: bool,
):
"""
Check that replicas stay alive when lightweight config updates are made and
Expand Down Expand Up @@ -604,15 +610,22 @@ def deployment_running():
wait_for_condition(deployment_running, timeout=15)
pid1 = requests.get("http://localhost:8000/f").text

updated_options = {
"num_replicas": 2,
"autoscaling_config": {"max_replicas": 2},
"user_config": {"name": "bob"},
"ray_actor_options": {"num_cpus": 0.2},
}
config_template["deployments"][0][option_to_update] = updated_options[
option_to_update
]
if field_to_update == "import_path":
config_template[
"import_path"
] = "ray.serve.tests.test_config_files.pid.bnode"
elif field_to_update == "runtime_env":
config_template["runtime_env"] = {"env_vars": {"test_var": "test_val"}}
elif field_to_update == "deployments":
updated_options = {
"num_replicas": 2,
"autoscaling_config": {"max_replicas": 2},
"user_config": {"name": "bob"},
"ray_actor_options": {"num_cpus": 0.2},
}
config_template["deployments"][0][option_to_update] = updated_options[
option_to_update
]

client.deploy_app(ServeApplicationSchema.parse_obj(config_template))
wait_for_condition(deployment_running, timeout=15)
Expand Down

0 comments on commit 19e5599

Please sign in to comment.