Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Update version if import_path or runtime_env in config is changed #27498

Merged
merged 4 commits into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 26 additions & 11 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,14 @@ def deploy_app(

# Compare new config options with old ones and set versions of new deployments
config_checkpoint = self.kv_store.get(CONFIG_CHECKPOINT_KEY)

if config_checkpoint is not None:
_, last_config_dict, last_version_dict = pickle.loads(config_checkpoint)
updated_version_dict = _generate_new_version_config(
updated_version_dict = _generate_deployment_config_versions(
config_dict, last_config_dict, last_version_dict
)
else:
updated_version_dict = _generate_new_version_config(config_dict, {}, {})
updated_version_dict = _generate_deployment_config_versions(config_dict)

self.kv_store.put(
CONFIG_CHECKPOINT_KEY,
Expand Down Expand Up @@ -572,21 +573,25 @@ def get_app_config(self) -> Dict:
return config


def _generate_new_version_config(
new_config: Dict, last_deployed_config: Dict, last_deployed_versions: Dict
def _generate_deployment_config_versions(
new_config: Dict,
last_deployed_config: Dict = None,
last_deployed_versions: Dict = None,
) -> Dict[str, str]:
"""
This function determines whether each deployment's version
should be changed based on the updated options.

When a deployment's options change, its version should generally change,
so old replicas are torn down. The only options which can be changed
without tearing down replicas (i.e. changing the version) are:
This function determines whether each deployment's version should be changed based
on the newly deployed config.

When ``import_path`` or ``runtime_env`` is changed, the versions for all deployments
should be changed, so old replicas are torn down. When the options for a deployment
in ``deployments`` change, its version should generally change. The only deployment
options that can be changed without tearing down replicas (i.e. changing the
version) are:
* num_replicas
* user_config
* autoscaling_config

An option is considered changed when:
A deployment option is considered changed when:
* it was not specified in last_deployed_config and is specified in new_config
* it was specified in last_deployed_config and is not specified in new_config
* it is specified in both last_deployed_config and new_config but the specified
Expand All @@ -604,6 +609,16 @@ def _generate_new_version_config(
Dictionary of {deployment_name: str -> version: str} containing updated
versions for deployments listed in the new config
"""
# If import_path or runtime_env is changed, it is considered a code change
if last_deployed_config is None:
last_deployed_config = {}
if last_deployed_versions is None:
last_deployed_versions = {}

if last_deployed_config.get("import_path") != new_config.get(
"import_path"
) or last_deployed_config.get("runtime_env") != new_config.get("runtime_env"):
last_deployed_config, last_deployed_versions = {}, {}

new_deployments = {d["name"]: d for d in new_config.get("deployments", [])}
old_deployments = {
Expand Down
11 changes: 11 additions & 0 deletions python/ray/serve/tests/test_config_files/pid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ray import serve
from ray.serve.deployment_graph import RayServeDAGHandle
import os


Expand All @@ -14,4 +15,14 @@ async def __call__(self):
return os.getpid()


@serve.deployment
class BasicDriver:
def __init__(self, dag: RayServeDAGHandle):
self.dag = dag

async def __call__(self):
return await self.dag.remote()


node = f.bind()
bnode = BasicDriver.bind(node)
49 changes: 45 additions & 4 deletions python/ray/serve/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray import serve
from ray.serve._private.common import DeploymentInfo
from ray.serve.generated.serve_pb2 import DeploymentRoute
from ray.serve.controller import _generate_new_version_config
from ray.serve.controller import _generate_deployment_config_versions


def test_redeploy_start_time(serve_instance):
Expand Down Expand Up @@ -52,10 +52,13 @@ def test(_):
("ray_actor_options", False),
],
)
def test_generate_new_version_config(
def test_config_versions_deployments_update(
last_config_had_option: bool, option_to_update: str, config_update: bool
):
"""Check that controller._generate_new_version_config() has correct behavior."""
"""
Check that controller._generate_deployment_config_versions() has correct behavior
when the config options in the ``deployments`` field is updated.
"""

options = {
"num_replicas": {"old": 1, "new": 2},
Expand Down Expand Up @@ -87,13 +90,51 @@ def test_generate_new_version_config(
new_config["deployments"][0][option_to_update] = options[option_to_update]["new"]

versions = {"f": "v1"}
new_versions = _generate_new_version_config(new_config, old_config, versions)
new_versions = _generate_deployment_config_versions(
new_config, old_config, versions
)
assert (
new_versions.get("f") is not None
and (new_versions.get("f") == versions.get("f")) == config_update
)


@pytest.mark.parametrize("field_to_update", ["import_path", "runtime_env", "both"])
def test_config_versions_non_deployments_update(field_to_update: str):
"""
Check that controller._generate_deployment_config_versions() has correct behavior
when the the ``import_path`` and ``runtime_env`` fields are updated.
"""

old_config = {
"import_path": "ray.serve.tests.test_config_files.pid.node",
"deployments": [
{
"name": "f",
"num_replicas": 1,
"ray_actor_options": {"num_cpus": 0.1},
}
],
}

new_config = copy.deepcopy(old_config)
if field_to_update == "import_path":
new_config["import_path"] = "ray.serve.tests.test_config_files.pid.bnode"
elif field_to_update == "runtime_env":
new_config["runtime_env"] = {"env_vars": {"test_var": "test_val"}}
elif field_to_update == "both":
new_config["import_path"] = "ray.serve.tests.test_config_files.pid.bnode"
new_config["runtime_env"] = {"env_vars": {"test_var": "test_val"}}

versions = {"f": "v1"}
new_versions = _generate_deployment_config_versions(
new_config, old_config, versions
)
assert new_versions.get("f") is not None and (
new_versions.get("f") != versions.get("f")
)


if __name__ == "__main__":
import sys

Expand Down
43 changes: 28 additions & 15 deletions python/ray/serve/tests/test_standalone2.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,16 +563,22 @@ def test_controller_recover_and_deploy(self, client: ServeControllerClient):
assert client.get_serve_status().app_status.deployment_timestamp == 0

@pytest.mark.parametrize(
"option_to_update,config_update",
"field_to_update,option_to_update,config_update",
[
("num_replicas", True),
("autoscaling_config", True),
("user_config", True),
("ray_actor_options", False),
("import_path", "", False),
("runtime_env", "", False),
("deployments", "num_replicas", True),
("deployments", "autoscaling_config", True),
("deployments", "user_config", True),
("deployments", "ray_actor_options", False),
],
)
def test_deploy_config_update(
self, client: ServeControllerClient, option_to_update: str, config_update: bool
self,
client: ServeControllerClient,
field_to_update: str,
option_to_update: str,
config_update: bool,
):
"""
Check that replicas stay alive when lightweight config updates are made and
Expand Down Expand Up @@ -604,15 +610,22 @@ def deployment_running():
wait_for_condition(deployment_running, timeout=15)
pid1 = requests.get("http://localhost:8000/f").text

updated_options = {
"num_replicas": 2,
"autoscaling_config": {"max_replicas": 2},
"user_config": {"name": "bob"},
"ray_actor_options": {"num_cpus": 0.2},
}
config_template["deployments"][0][option_to_update] = updated_options[
option_to_update
]
if field_to_update == "import_path":
config_template[
"import_path"
] = "ray.serve.tests.test_config_files.pid.bnode"
elif field_to_update == "runtime_env":
config_template["runtime_env"] = {"env_vars": {"test_var": "test_val"}}
elif field_to_update == "deployments":
updated_options = {
"num_replicas": 2,
"autoscaling_config": {"max_replicas": 2},
"user_config": {"name": "bob"},
"ray_actor_options": {"num_cpus": 0.2},
}
config_template["deployments"][0][option_to_update] = updated_options[
option_to_update
]

client.deploy_app(ServeApplicationSchema.parse_obj(config_template))
wait_for_condition(deployment_running, timeout=15)
Expand Down