Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Promote graceful shutdown and health check #26682

Merged
merged 3 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/serve/deploying-serve.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ This method should take no arguments and return no result, raising an exception
You can also customize how frequently the health check is run and the timeout when a replica will be deemed unhealthy if it hasn't responded in the deployment options.

> ```python
> @serve.deployment(_health_check_period_s=10, _health_check_timeout_s=30)
> @serve.deployment(health_check_period_s=10, health_check_timeout_s=30)
> class MyDeployment:
> def __init__(self, db_addr: str):
> self._my_db_connection = connect_to_db(db_addr)
Expand Down
24 changes: 12 additions & 12 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,10 @@ def deployment(
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
graceful_shutdown_wait_loop_s: Optional[float] = None,
graceful_shutdown_timeout_s: Optional[float] = None,
health_check_period_s: Optional[float] = None,
health_check_timeout_s: Optional[float] = None,
) -> Callable[[Callable], Deployment]:
pass

Expand All @@ -364,10 +364,10 @@ def deployment(
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
graceful_shutdown_wait_loop_s: Optional[float] = None,
graceful_shutdown_timeout_s: Optional[float] = None,
health_check_period_s: Optional[float] = None,
health_check_timeout_s: Optional[float] = None,
) -> Callable[[Callable], Deployment]:
"""Define a Serve deployment.

Expand Down Expand Up @@ -439,10 +439,10 @@ def deployment(
user_config=user_config,
max_concurrent_queries=max_concurrent_queries,
autoscaling_config=autoscaling_config,
graceful_shutdown_wait_loop_s=_graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=_graceful_shutdown_timeout_s,
health_check_period_s=_health_check_period_s,
health_check_timeout_s=_health_check_timeout_s,
graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=graceful_shutdown_timeout_s,
health_check_period_s=health_check_period_s,
health_check_timeout_s=health_check_timeout_s,
)

def decorator(_func_or_class):
Expand Down
40 changes: 20 additions & 20 deletions python/ray/serve/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ def options(
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
graceful_shutdown_wait_loop_s: Optional[float] = None,
graceful_shutdown_timeout_s: Optional[float] = None,
health_check_period_s: Optional[float] = None,
health_check_timeout_s: Optional[float] = None,
) -> "Deployment":
"""Return a copy of this deployment with updated options.

Expand Down Expand Up @@ -328,17 +328,17 @@ def options(
if autoscaling_config is not None:
new_config.autoscaling_config = autoscaling_config

if _graceful_shutdown_wait_loop_s is not None:
new_config.graceful_shutdown_wait_loop_s = _graceful_shutdown_wait_loop_s
if graceful_shutdown_wait_loop_s is not None:
new_config.graceful_shutdown_wait_loop_s = graceful_shutdown_wait_loop_s

if _graceful_shutdown_timeout_s is not None:
new_config.graceful_shutdown_timeout_s = _graceful_shutdown_timeout_s
if graceful_shutdown_timeout_s is not None:
new_config.graceful_shutdown_timeout_s = graceful_shutdown_timeout_s

if _health_check_period_s is not None:
new_config.health_check_period_s = _health_check_period_s
if health_check_period_s is not None:
new_config.health_check_period_s = health_check_period_s

if _health_check_timeout_s is not None:
new_config.health_check_timeout_s = _health_check_timeout_s
if health_check_timeout_s is not None:
new_config.health_check_timeout_s = health_check_timeout_s

return Deployment(
func_or_class,
Expand Down Expand Up @@ -366,10 +366,10 @@ def set_options(
user_config: Optional[Any] = None,
max_concurrent_queries: Optional[int] = None,
autoscaling_config: Optional[Union[Dict, AutoscalingConfig]] = None,
_graceful_shutdown_wait_loop_s: Optional[float] = None,
_graceful_shutdown_timeout_s: Optional[float] = None,
_health_check_period_s: Optional[float] = None,
_health_check_timeout_s: Optional[float] = None,
graceful_shutdown_wait_loop_s: Optional[float] = None,
graceful_shutdown_timeout_s: Optional[float] = None,
health_check_period_s: Optional[float] = None,
health_check_timeout_s: Optional[float] = None,
) -> None:
"""Overwrite this deployment's options. Mutates the deployment.

Expand All @@ -389,10 +389,10 @@ def set_options(
user_config=user_config,
max_concurrent_queries=max_concurrent_queries,
autoscaling_config=autoscaling_config,
_graceful_shutdown_wait_loop_s=_graceful_shutdown_wait_loop_s,
_graceful_shutdown_timeout_s=_graceful_shutdown_timeout_s,
_health_check_period_s=_health_check_period_s,
_health_check_timeout_s=_health_check_timeout_s,
graceful_shutdown_wait_loop_s=graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=graceful_shutdown_timeout_s,
health_check_period_s=health_check_period_s,
health_check_timeout_s=health_check_timeout_s,
)

self._func_or_class = validated._func_or_class
Expand Down
4 changes: 0 additions & 4 deletions python/ray/serve/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class DeploymentSchema(
"default if null."
),
ge=0,
alias="_graceful_shutdown_wait_loop_s",
)
graceful_shutdown_timeout_s: float = Field(
default=None,
Expand All @@ -151,7 +150,6 @@ class DeploymentSchema(
"default if null."
),
ge=0,
alias="_graceful_shutdown_timeout_s",
)
health_check_period_s: float = Field(
default=None,
Expand All @@ -160,7 +158,6 @@ class DeploymentSchema(
"replicas. Uses a default if null."
),
gt=0,
alias="_health_check_period_s",
)
health_check_timeout_s: float = Field(
default=None,
Expand All @@ -170,7 +167,6 @@ class DeploymentSchema(
"unhealthy. Uses a default if null."
),
gt=0,
alias="_health_check_timeout_s",
)
ray_actor_options: RayActorOptionsSchema = Field(
default=None, description="Options set for each replica actor."
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def test_serve_forceful_shutdown(serve_instance):
@serve.deployment(_graceful_shutdown_timeout_s=0.1)
@serve.deployment(graceful_shutdown_timeout_s=0.1)
def sleeper():
while True:
time.sleep(1000)
Expand All @@ -28,8 +28,8 @@ def test_serve_graceful_shutdown(serve_instance):
@serve.deployment(
name="wait",
max_concurrent_queries=10,
_graceful_shutdown_timeout_s=1000,
_graceful_shutdown_wait_loop_s=0.5,
graceful_shutdown_timeout_s=1000,
graceful_shutdown_wait_loop_s=0.5,
)
class Wait:
async def __call__(self, signal_actor):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def test_set_options_basic(self):
num_replicas=4,
max_concurrent_queries=3,
ray_actor_options={"num_cpus": 2},
_health_check_timeout_s=17,
health_check_timeout_s=17,
)
def f():
pass
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_autoscaling_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_e2e(serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down
14 changes: 7 additions & 7 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def test_e2e_basic_scale_up_down(min_replicas, serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down Expand Up @@ -232,7 +232,7 @@ def test_e2e_basic_scale_up_down_with_0_replica(serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down Expand Up @@ -597,7 +597,7 @@ def test_e2e_bursty(serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down Expand Up @@ -654,7 +654,7 @@ def test_e2e_intermediate_downscaling(serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down Expand Up @@ -709,7 +709,7 @@ def test_e2e_update_autoscaling_deployment(serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down Expand Up @@ -812,7 +812,7 @@ def test_e2e_raise_min_replicas(serve_instance):
},
# We will send over a lot of queries. This will make sure replicas are
# killed quickly during cleanup.
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
)
Expand Down Expand Up @@ -847,7 +847,7 @@ def __call__(self):
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2,
},
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
max_concurrent_queries=1000,
version="v1",
).deploy()
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_scale_up(ray_cluster):
# By default, Serve controller and proxy actors use 0 CPUs,
# so initially there should only be room for 1 replica.

@serve.deployment(version="1", num_replicas=1, _health_check_period_s=1)
@serve.deployment(version="1", num_replicas=1, health_check_period_s=1)
def D(*args):
return os.getpid()

Expand Down
12 changes: 6 additions & 6 deletions python/ray/serve/tests/test_deployment_graph_autoscaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_autoscaling_with_chain_nodes(min_replicas, serve_instance):

@serve.deployment(
autoscaling_config=autoscaling_config,
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
)
class Model1:
def __init__(self, weight):
Expand All @@ -81,7 +81,7 @@ def forward(self, input):

@serve.deployment(
autoscaling_config=autoscaling_config,
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
)
class Model2:
def __init__(self, weight):
Expand All @@ -98,7 +98,7 @@ def forward(self, input):
serve_dag = DAGDriver.options(
route_prefix="/my-dag",
autoscaling_config=autoscaling_config,
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
).bind(output2)

dag_handle = serve.run(serve_dag)
Expand Down Expand Up @@ -153,7 +153,7 @@ def test_autoscaling_with_ensemble_nodes(serve_instance):

@serve.deployment(
autoscaling_config=autoscaling_config,
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
)
class Model:
def __init__(self, weight):
Expand All @@ -164,7 +164,7 @@ def forward(self, input):

@serve.deployment(
autoscaling_config=autoscaling_config,
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
)
def combine(value_refs):
ray.get(signal.wait.remote())
Expand All @@ -179,7 +179,7 @@ def combine(value_refs):
serve_dag = DAGDriver.options(
route_prefix="/my-dag",
autoscaling_config=autoscaling_config,
_graceful_shutdown_timeout_s=1,
graceful_shutdown_timeout_s=1,
).bind(output)

dag_handle = serve.run(serve_dag)
Expand Down
8 changes: 4 additions & 4 deletions python/ray/serve/tests/test_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def reset(self):
self._count = 0


@serve.deployment(_health_check_period_s=1, _health_check_timeout_s=1)
@serve.deployment(health_check_period_s=1, health_check_timeout_s=1)
class Patient:
def __init__(self):
self.healthy = True
Expand Down Expand Up @@ -126,7 +126,7 @@ def check_health(self):
def set_should_fail(self):
self.should_fail = True

@serve.deployment(_health_check_period_s=1)
@serve.deployment(health_check_period_s=1)
class Child(Parent):
def __call__(self, *args):
return ray.get_runtime_context().current_actor
Expand All @@ -143,7 +143,7 @@ def test_nonconsecutive_failures(serve_instance):
counter = ray.remote(Counter).remote()

# Test that a health check failing every other call isn't marked unhealthy.
@serve.deployment(_health_check_period_s=0.1)
@serve.deployment(health_check_period_s=0.1)
class FlakyHealthCheck:
def check_health(self):
curr_count = ray.get(counter.inc.remote())
Expand All @@ -166,7 +166,7 @@ def test_consecutive_failures(serve_instance):

counter = ray.remote(Counter).remote()

@serve.deployment(_health_check_period_s=1)
@serve.deployment(health_check_period_s=1)
class ChronicallyUnhealthy:
def __init__(self):
self._actor_id = ray.get_runtime_context().current_actor._actor_id
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ def test_healthcheck_timeout(serve_instance):
signal = SignalActor.remote()

@serve.deployment(
_health_check_timeout_s=2,
_health_check_period_s=1,
_graceful_shutdown_timeout_s=0,
health_check_timeout_s=2,
health_check_period_s=1,
graceful_shutdown_timeout_s=0,
)
class A:
def check_health(self):
Expand Down
Loading