Skip to content

Commit

Permalink
[Serve][Part1] Update the tests to use graph deploy (ray-project#26310)
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan van der Kleij <[email protected]>
  • Loading branch information
sihanwang41 authored and Stefan van der Kleij committed Aug 18, 2022
1 parent aa63892 commit 607a155
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 105 deletions.
1 change: 0 additions & 1 deletion python/ray/serve/deployment_function_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(
func_options,
other_args_to_resolve=other_args_to_resolve,
)

if "deployment_schema" in self._bound_other_args_to_resolve:
deployment_schema: DeploymentSchema = self._bound_other_args_to_resolve[
"deployment_schema"
Expand Down
1 change: 1 addition & 0 deletions python/ray/serve/deployment_graph_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def replace_with_handle(node):
deployment_schema: DeploymentSchema = dag_node._bound_other_args_to_resolve[
"deployment_schema"
]

deployment_shell: Deployment = schema_to_deployment(deployment_schema)

# Prefer user specified name to override the generated one.
Expand Down
11 changes: 4 additions & 7 deletions python/ray/serve/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ def sleeper():
while True:
time.sleep(1000)

sleeper.deploy()

handle = sleeper.get_handle()
handle = serve.run(sleeper.bind())
ref = handle.remote()
sleeper.delete()

Expand All @@ -37,8 +35,7 @@ class Wait:
async def __call__(self, signal_actor):
await signal_actor.wait.remote()

Wait.deploy()
handle = Wait.get_handle()
handle = serve.run(Wait.bind())
refs = [handle.remote(signal) for _ in range(10)]

# Wait for all the queries to be enqueued
Expand Down Expand Up @@ -95,8 +92,8 @@ def __init__(self):
def __call__(self):
return "Ready"

LongStartingServable.deploy()
ray.get(LongStartingServable.get_handle().remote(), timeout=10)
handle = serve.run(LongStartingServable.bind())
ray.get(handle.remote(), timeout=10)


if __name__ == "__main__":
Expand Down
3 changes: 1 addition & 2 deletions python/ray/serve/tests/test_autoscaling_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ class A:
def __call__(self):
time.sleep(0.5)

A.deploy()
handle = A.get_handle()
handle = serve.run(A.bind())
[handle.remote() for _ in range(100)]

# Wait for metrics to propagate
Expand Down
66 changes: 33 additions & 33 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,11 @@ class A:
def __call__(self):
ray.get(signal.wait.remote())

A.deploy()
handle = serve.run(A.bind())

controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)

handle = A.get_handle()
[handle.remote() for _ in range(100)]

# scale up one more replica from min_replicas
Expand Down Expand Up @@ -241,12 +240,11 @@ class A:
def __call__(self):
ray.get(signal.wait.remote())

A.deploy()
handle = serve.run(A.bind())

controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)

handle = A.get_handle()
[handle.remote() for _ in range(100)]

# scale up one more replica from min_replicas
Expand Down Expand Up @@ -280,7 +278,7 @@ class A:
def __call__(self):
return "ok!"

A.deploy()
serve.run(A.bind())

controller = serve_instance._controller
assert get_num_running_replicas(controller, A) == 2
Expand Down Expand Up @@ -607,12 +605,11 @@ class A:
def __call__(self):
ray.get(signal.wait.remote())

A.deploy()
handle = serve.run(A.bind())

controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)

handle = A.get_handle()
[handle.remote() for _ in range(100)]

wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2)
Expand Down Expand Up @@ -665,12 +662,12 @@ class A:
def __call__(self):
ray.get(signal.wait.remote())

A.deploy()
handle = serve.run(A.bind())

controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)

handle = A.get_handle()
A.get_handle()
[handle.remote() for _ in range(50)]

wait_for_condition(
Expand Down Expand Up @@ -720,15 +717,14 @@ class A:
def __call__(self):
ray.get(signal.wait.remote())

A.deploy()
handle = serve.run(A.bind())
print("Deployed A with min_replicas 1 and max_replicas 10.")

controller = serve_instance._controller
start_time = get_deployment_start_time(controller, A)

assert get_num_running_replicas(controller, A) == 0

handle = A.get_handle()
[handle.remote() for _ in range(400)]
print("Issued 400 requests.")

Expand All @@ -742,17 +738,19 @@ def __call__(self):
time.sleep(3)
print("Issued 458 requests. Request routing in-progress.")

A.options(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 2,
"max_replicas": 20,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2,
},
version="v1",
).deploy()
serve.run(
A.options(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 2,
"max_replicas": 20,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2,
},
version="v1",
).bind()
)
print("Redeployed A.")

wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 20)
Expand All @@ -774,17 +772,19 @@ def __call__(self):
assert get_deployment_start_time(controller, A) == start_time

# scale down to 0
A.options(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 0,
"max_replicas": 20,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2,
},
version="v1",
).deploy()
serve.run(
A.options(
_autoscaling_config={
"metrics_interval_s": 0.1,
"min_replicas": 0,
"max_replicas": 20,
"look_back_period_s": 0.2,
"downscale_delay_s": 0.2,
"upscale_delay_s": 0.2,
},
version="v1",
).bind()
)
print("Redeployed A.")

wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1)
Expand Down
6 changes: 2 additions & 4 deletions python/ray/serve/tests/test_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ async def handle_batch(self, requests):
async def __call__(self, request):
return await self.handle_batch(request)

BatchingExample.deploy()
handle = serve.run(BatchingExample.bind())

future_list = []
handle = BatchingExample.get_handle()
for _ in range(20):
f = handle.remote(1)
future_list.append(f)
Expand All @@ -50,9 +49,8 @@ async def __call__(self, request):
return await self.handle_batch(request)

# Set the max batch size.
NoListReturned.deploy()
handle = serve.run(NoListReturned.bind())

handle = NoListReturned.get_handle()
with pytest.raises(ray.exceptions.RayTaskError):
assert ray.get(handle.remote(1))

Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def get_pids(expected, timeout=30):
serve.start(detached=True)

print("Initial deploy.")
D.deploy()
serve.run(D.bind())
pids1 = get_pids(5)

# Remove the node. There should still be three replicas running.
Expand Down
16 changes: 8 additions & 8 deletions python/ray/serve/tests/test_constructor_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ async def serve(self, request):
return "hi"

with pytest.raises(RuntimeError):
ConstructorFailureDeploymentOneReplica.deploy()
serve.run(ConstructorFailureDeploymentOneReplica.bind())

# Assert no replicas are running in deployment deployment after failed
# deploy() call
# deploy call
deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote())
assert deployment_dict["ConstructorFailureDeploymentOneReplica"] == []

Expand All @@ -36,10 +36,10 @@ async def serve(self, request):
return "hi"

with pytest.raises(RuntimeError):
ConstructorFailureDeploymentTwoReplicas.deploy()
serve.run(ConstructorFailureDeploymentTwoReplicas.bind())

# Assert no replicas are running in deployment deployment after failed
# deploy() call
# deploy call
deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote())
assert deployment_dict["ConstructorFailureDeploymentTwoReplicas"] == []

Expand Down Expand Up @@ -70,10 +70,10 @@ def __init__(self):
async def serve(self, request):
return "hi"

PartialConstructorFailureDeployment.deploy()
serve.run(PartialConstructorFailureDeployment.bind())

# Assert 2 replicas are running in deployment deployment after partially
# successful deploy() call
# successful deploy call
deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote())
assert len(deployment_dict["PartialConstructorFailureDeployment"]) == 2

Expand All @@ -97,9 +97,9 @@ def __init__(self):
async def serve(self, request):
return "hi"

TransientConstructorFailureDeployment.deploy()
serve.run(TransientConstructorFailureDeployment.bind())
# Assert 2 replicas are running in deployment deployment after partially
# successful deploy() call with transient error
# successful deploy call with transient error
deployment_dict = ray.get(serve_instance._controller._all_running_replicas.remote())
assert len(deployment_dict["TransientConstructorFailureDeployment"]) == 2

Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_redeploy_start_time(serve_instance):
def test(_):
return "1"

test.deploy()
serve.run(test.bind())
deployment_route = DeploymentRoute.FromString(
ray.get(controller.get_deployment_info.remote("test"))
)
Expand All @@ -30,7 +30,7 @@ def test(_):
def test(_):
return "2"

test.deploy()
serve.run(test.bind())
deployment_route = DeploymentRoute.FromString(
ray.get(controller.get_deployment_info.remote("test"))
)
Expand Down
Loading

0 comments on commit 607a155

Please sign in to comment.