From 248202e0239886f3d2dafa1071834d6c040e9e9f Mon Sep 17 00:00:00 2001 From: Sihan Wang Date: Mon, 18 Jul 2022 15:14:37 -0700 Subject: [PATCH 1/3] [Serve] Promote graceful shutdown and health check --- python/ray/serve/api.py | 24 ++++----- python/ray/serve/deployment.py | 40 +++++++-------- python/ray/serve/schema.py | 2 - python/ray/serve/tests/test_api.py | 2 +- python/ray/serve/tests/test_cluster.py | 2 +- python/ray/serve/tests/test_healthcheck.py | 8 +-- python/ray/serve/tests/test_regression.py | 6 +-- python/ray/serve/tests/test_schema.py | 60 ---------------------- 8 files changed, 41 insertions(+), 103 deletions(-) diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 44770a7b379b..0a39b1beff2c 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -343,10 +343,10 @@ def deployment( user_config: Optional[Any] = None, max_concurrent_queries: Optional[int] = None, autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None, - _graceful_shutdown_wait_loop_s: Optional[float] = None, - _graceful_shutdown_timeout_s: Optional[float] = None, - _health_check_period_s: Optional[float] = None, - _health_check_timeout_s: Optional[float] = None, + graceful_shutdown_wait_loop_s: Optional[float] = None, + graceful_shutdown_timeout_s: Optional[float] = None, + health_check_period_s: Optional[float] = None, + health_check_timeout_s: Optional[float] = None, ) -> Callable[[Callable], Deployment]: pass @@ -364,10 +364,10 @@ def deployment( user_config: Optional[Any] = None, max_concurrent_queries: Optional[int] = None, autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None, - _graceful_shutdown_wait_loop_s: Optional[float] = None, - _graceful_shutdown_timeout_s: Optional[float] = None, - _health_check_period_s: Optional[float] = None, - _health_check_timeout_s: Optional[float] = None, + graceful_shutdown_wait_loop_s: Optional[float] = None, + graceful_shutdown_timeout_s: Optional[float] = None, + health_check_period_s: Optional[float] = None, + health_check_timeout_s: Optional[float] = None, ) -> Callable[[Callable], Deployment]: """Define a Serve deployment. @@ -439,10 +439,10 @@ def deployment( user_config=user_config, max_concurrent_queries=max_concurrent_queries, autoscaling_config=autoscaling_config, - graceful_shutdown_wait_loop_s=_graceful_shutdown_wait_loop_s, - graceful_shutdown_timeout_s=_graceful_shutdown_timeout_s, - health_check_period_s=_health_check_period_s, - health_check_timeout_s=_health_check_timeout_s, + graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s, + graceful_shutdown_timeout_s=graceful_shutdown_timeout_s, + health_check_period_s=health_check_period_s, + health_check_timeout_s=health_check_timeout_s, ) def decorator(_func_or_class): diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index 0c54a8eaada9..ae4f3ac35d36 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -275,10 +275,10 @@ def options( user_config: Optional[Any] = None, max_concurrent_queries: Optional[int] = None, autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None, - _graceful_shutdown_wait_loop_s: Optional[float] = None, - _graceful_shutdown_timeout_s: Optional[float] = None, - _health_check_period_s: Optional[float] = None, - _health_check_timeout_s: Optional[float] = None, + graceful_shutdown_wait_loop_s: Optional[float] = None, + graceful_shutdown_timeout_s: Optional[float] = None, + health_check_period_s: Optional[float] = None, + health_check_timeout_s: Optional[float] = None, ) -> "Deployment": """Return a copy of this deployment with updated options. @@ -328,17 +328,17 @@ def options( if autoscaling_config is not None: new_config.autoscaling_config = autoscaling_config - if _graceful_shutdown_wait_loop_s is not None: - new_config.graceful_shutdown_wait_loop_s = _graceful_shutdown_wait_loop_s + if graceful_shutdown_wait_loop_s is not None: + new_config.graceful_shutdown_wait_loop_s = graceful_shutdown_wait_loop_s - if _graceful_shutdown_timeout_s is not None: - new_config.graceful_shutdown_timeout_s = _graceful_shutdown_timeout_s + if graceful_shutdown_timeout_s is not None: + new_config.graceful_shutdown_timeout_s = graceful_shutdown_timeout_s - if _health_check_period_s is not None: - new_config.health_check_period_s = _health_check_period_s + if health_check_period_s is not None: + new_config.health_check_period_s = health_check_period_s - if _health_check_timeout_s is not None: - new_config.health_check_timeout_s = _health_check_timeout_s + if health_check_timeout_s is not None: + new_config.health_check_timeout_s = health_check_timeout_s return Deployment( func_or_class, @@ -366,10 +366,10 @@ def set_options( user_config: Optional[Any] = None, max_concurrent_queries: Optional[int] = None, autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None, - _graceful_shutdown_wait_loop_s: Optional[float] = None, - _graceful_shutdown_timeout_s: Optional[float] = None, - _health_check_period_s: Optional[float] = None, - _health_check_timeout_s: Optional[float] = None, + graceful_shutdown_wait_loop_s: Optional[float] = None, + graceful_shutdown_timeout_s: Optional[float] = None, + health_check_period_s: Optional[float] = None, + health_check_timeout_s: Optional[float] = None, ) -> None: """Overwrite this deployment's options. Mutates the deployment. @@ -389,10 +389,10 @@ def set_options( user_config=user_config, max_concurrent_queries=max_concurrent_queries, autoscaling_config=autoscaling_config, - _graceful_shutdown_wait_loop_s=_graceful_shutdown_wait_loop_s, - _graceful_shutdown_timeout_s=_graceful_shutdown_timeout_s, - _health_check_period_s=_health_check_period_s, - _health_check_timeout_s=_health_check_timeout_s, + graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s, + graceful_shutdown_timeout_s=graceful_shutdown_timeout_s, + health_check_period_s=health_check_period_s, + health_check_timeout_s=health_check_timeout_s, ) self._func_or_class = validated._func_or_class diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index eaa3b275ed20..6cf52bc35090 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -160,7 +160,6 @@ class DeploymentSchema( "replicas. Uses a default if null." ), gt=0, - alias="_health_check_period_s", ) health_check_timeout_s: float = Field( default=None, @@ -170,7 +169,6 @@ class DeploymentSchema( "unhealthy. Uses a default if null." ), gt=0, - alias="_health_check_timeout_s", ) ray_actor_options: RayActorOptionsSchema = Field( default=None, description="Options set for each replica actor." diff --git a/python/ray/serve/tests/test_api.py b/python/ray/serve/tests/test_api.py index 80b274f0629f..1dfb90592602 100644 --- a/python/ray/serve/tests/test_api.py +++ b/python/ray/serve/tests/test_api.py @@ -430,7 +430,7 @@ def test_set_options_basic(self): num_replicas=4, max_concurrent_queries=3, ray_actor_options={"num_cpus": 2}, - _health_check_timeout_s=17, + health_check_timeout_s=17, ) def f(): pass diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index e26fc7627283..033b45236316 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -30,7 +30,7 @@ def test_scale_up(ray_cluster): # By default, Serve controller and proxy actors use 0 CPUs, # so initially there should only be room for 1 replica. - @serve.deployment(version="1", num_replicas=1, _health_check_period_s=1) + @serve.deployment(version="1", num_replicas=1, health_check_period_s=1) def D(*args): return os.getpid() diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index b5762e5fd930..ff308d6e30b1 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -22,7 +22,7 @@ def reset(self): self._count = 0 -@serve.deployment(_health_check_period_s=1, _health_check_timeout_s=1) +@serve.deployment(health_check_period_s=1, health_check_timeout_s=1) class Patient: def __init__(self): self.healthy = True @@ -126,7 +126,7 @@ def check_health(self): def set_should_fail(self): self.should_fail = True - @serve.deployment(_health_check_period_s=1) + @serve.deployment(health_check_period_s=1) class Child(Parent): def __call__(self, *args): return ray.get_runtime_context().current_actor @@ -143,7 +143,7 @@ def test_nonconsecutive_failures(serve_instance): counter = ray.remote(Counter).remote() # Test that a health check failing every other call isn't marked unhealthy. - @serve.deployment(_health_check_period_s=0.1) + @serve.deployment(health_check_period_s=0.1) class FlakyHealthCheck: def check_health(self): curr_count = ray.get(counter.inc.remote()) @@ -166,7 +166,7 @@ def test_consecutive_failures(serve_instance): counter = ray.remote(Counter).remote() - @serve.deployment(_health_check_period_s=1) + @serve.deployment(health_check_period_s=1) class ChronicallyUnhealthy: def __init__(self): self._actor_id = ray.get_runtime_context().current_actor._actor_id diff --git a/python/ray/serve/tests/test_regression.py b/python/ray/serve/tests/test_regression.py index 38caacea0ba3..d1294bed5e54 100644 --- a/python/ray/serve/tests/test_regression.py +++ b/python/ray/serve/tests/test_regression.py @@ -244,9 +244,9 @@ def test_healthcheck_timeout(serve_instance): signal = SignalActor.remote() @serve.deployment( - _health_check_timeout_s=2, - _health_check_period_s=1, - _graceful_shutdown_timeout_s=0, + health_check_timeout_s=2, + health_check_period_s=1, + graceful_shutdown_timeout_s=0, ) class A: def check_health(self): diff --git a/python/ray/serve/tests/test_schema.py b/python/ray/serve/tests/test_schema.py index b34e7532b6f5..239d3b92e346 100644 --- a/python/ray/serve/tests/test_schema.py +++ b/python/ray/serve/tests/test_schema.py @@ -478,66 +478,6 @@ def test_serve_application_invalid_import_path(self, path): with pytest.raises(ValidationError): ServeApplicationSchema.parse_obj(serve_application_schema) - def test_serve_application_aliasing(self): - """Check aliasing behavior for schemas.""" - - # Check that private options can optionally include underscore - app_dict = { - "import_path": "module.graph", - "runtime_env": {}, - "deployments": [ - { - "name": "d1", - "max_concurrent_queries": 3, - "autoscaling_config": {}, - "_graceful_shutdown_wait_loop_s": 30, - "graceful_shutdown_timeout_s": 10, - "_health_check_period_s": 5, - "health_check_timeout_s": 7, - }, - { - "name": "d2", - "max_concurrent_queries": 6, - "autoscaling_config": {}, - "graceful_shutdown_wait_loop_s": 50, - "_graceful_shutdown_timeout_s": 15, - "health_check_period_s": 53, - "_health_check_timeout_s": 73, - }, - ], - } - - schema = ServeApplicationSchema.parse_obj(app_dict) - - # Check that schema dictionary can include private options with an - # underscore (using the aliases) - - private_options = { - "_graceful_shutdown_wait_loop_s", - "_graceful_shutdown_timeout_s", - "_health_check_period_s", - "_health_check_timeout_s", - } - - for deployment in schema.dict(by_alias=True)["deployments"]: - for option in private_options: - # Option with leading underscore - assert option in deployment - - # Option without leading underscore - assert option[1:] not in deployment - - # Check that schema dictionary can include private options without an - # underscore (using the field names) - - for deployment in schema.dict()["deployments"]: - for option in private_options: - # Option without leading underscore - assert option[1:] in deployment - - # Option with leading underscore - assert option not in deployment - class TestServeStatusSchema: def get_valid_serve_status_schema(self): From e5e540ce7ca304a656fccdeaada4610d7c8ba51b Mon Sep 17 00:00:00 2001 From: Sihan Wang Date: Mon, 18 Jul 2022 17:04:02 -0700 Subject: [PATCH 2/3] Fix --- python/ray/serve/schema.py | 2 -- python/ray/serve/tests/test_advanced.py | 6 +++--- python/ray/serve/tests/test_autoscaling_metrics.py | 2 +- python/ray/serve/tests/test_autoscaling_policy.py | 14 +++++++------- .../tests/test_deployment_graph_autoscaling.py | 12 ++++++------ 5 files changed, 17 insertions(+), 19 deletions(-) diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index 6cf52bc35090..52fc7ee47e00 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -141,7 +141,6 @@ class DeploymentSchema( "default if null." ), ge=0, - alias="_graceful_shutdown_wait_loop_s", ) graceful_shutdown_timeout_s: float = Field( default=None, @@ -151,7 +150,6 @@ class DeploymentSchema( "default if null." ), ge=0, - alias="_graceful_shutdown_timeout_s", ) health_check_period_s: float = Field( default=None, diff --git a/python/ray/serve/tests/test_advanced.py b/python/ray/serve/tests/test_advanced.py index 0a679bce1ae5..495ba9a7cb2f 100644 --- a/python/ray/serve/tests/test_advanced.py +++ b/python/ray/serve/tests/test_advanced.py @@ -9,7 +9,7 @@ def test_serve_forceful_shutdown(serve_instance): - @serve.deployment(_graceful_shutdown_timeout_s=0.1) + @serve.deployment(graceful_shutdown_timeout_s=0.1) def sleeper(): while True: time.sleep(1000) @@ -28,8 +28,8 @@ def test_serve_graceful_shutdown(serve_instance): @serve.deployment( name="wait", max_concurrent_queries=10, - _graceful_shutdown_timeout_s=1000, - _graceful_shutdown_wait_loop_s=0.5, + graceful_shutdown_timeout_s=1000, + graceful_shutdown_wait_loop_s=0.5, ) class Wait: async def __call__(self, signal_actor): diff --git a/python/ray/serve/tests/test_autoscaling_metrics.py b/python/ray/serve/tests/test_autoscaling_metrics.py index 9c93c0c547b5..d60fc5c6de5a 100644 --- a/python/ray/serve/tests/test_autoscaling_metrics.py +++ b/python/ray/serve/tests/test_autoscaling_metrics.py @@ -79,7 +79,7 @@ def test_e2e(serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 3c20a78f49c1..04d4aeefd07b 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -187,7 +187,7 @@ def test_e2e_basic_scale_up_down(min_replicas, serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) @@ -232,7 +232,7 @@ def test_e2e_basic_scale_up_down_with_0_replica(serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) @@ -597,7 +597,7 @@ def test_e2e_bursty(serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) @@ -654,7 +654,7 @@ def test_e2e_intermediate_downscaling(serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) @@ -709,7 +709,7 @@ def test_e2e_update_autoscaling_deployment(serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) @@ -812,7 +812,7 @@ def test_e2e_raise_min_replicas(serve_instance): }, # We will send over a lot of queries. This will make sure replicas are # killed quickly during cleanup. - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ) @@ -847,7 +847,7 @@ def __call__(self): "downscale_delay_s": 0.2, "upscale_delay_s": 0.2, }, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, max_concurrent_queries=1000, version="v1", ).deploy() diff --git a/python/ray/serve/tests/test_deployment_graph_autoscaling.py b/python/ray/serve/tests/test_deployment_graph_autoscaling.py index fab1e300690f..b390d9c5f360 100644 --- a/python/ray/serve/tests/test_deployment_graph_autoscaling.py +++ b/python/ray/serve/tests/test_deployment_graph_autoscaling.py @@ -69,7 +69,7 @@ def test_autoscaling_with_chain_nodes(min_replicas, serve_instance): @serve.deployment( autoscaling_config=autoscaling_config, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, ) class Model1: def __init__(self, weight): @@ -81,7 +81,7 @@ def forward(self, input): @serve.deployment( autoscaling_config=autoscaling_config, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, ) class Model2: def __init__(self, weight): @@ -98,7 +98,7 @@ def forward(self, input): serve_dag = DAGDriver.options( route_prefix="/my-dag", autoscaling_config=autoscaling_config, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, ).bind(output2) dag_handle = serve.run(serve_dag) @@ -153,7 +153,7 @@ def test_autoscaling_with_ensemble_nodes(serve_instance): @serve.deployment( autoscaling_config=autoscaling_config, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, ) class Model: def __init__(self, weight): @@ -164,7 +164,7 @@ def forward(self, input): @serve.deployment( autoscaling_config=autoscaling_config, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, ) def combine(value_refs): ray.get(signal.wait.remote()) @@ -179,7 +179,7 @@ def combine(value_refs): serve_dag = DAGDriver.options( route_prefix="/my-dag", autoscaling_config=autoscaling_config, - _graceful_shutdown_timeout_s=1, + graceful_shutdown_timeout_s=1, ).bind(output) dag_handle = serve.run(serve_dag) From 7e90fd98f80d0df5a2a2aca0dfa825763a2fb7e8 Mon Sep 17 00:00:00 2001 From: Sihan Wang Date: Tue, 19 Jul 2022 14:50:12 -0700 Subject: [PATCH 3/3] Update the doc --- doc/source/serve/deploying-serve.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/serve/deploying-serve.md b/doc/source/serve/deploying-serve.md index 49e144a55652..6a7e02e34984 100644 --- a/doc/source/serve/deploying-serve.md +++ b/doc/source/serve/deploying-serve.md @@ -218,7 +218,7 @@ This method should take no arguments and return no result, raising an exception You can also customize how frequently the health check is run and the timeout when a replica will be deemed unhealthy if it hasn't responded in the deployment options. > ```python -> @serve.deployment(_health_check_period_s=10, _health_check_timeout_s=30) +> @serve.deployment(health_check_period_s=10, health_check_timeout_s=30) > class MyDeployment: > def __init__(self, db_addr: str): > self._my_db_connection = connect_to_db(db_addr)