Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Implement serve.run() and Application #23157

Merged
merged 125 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
cc50e9f
Write import paths in application
shrekris-anyscale Mar 10, 2022
f908168
Remove __ray_actor_class__
shrekris-anyscale Mar 10, 2022
ba85404
polish helps
edoakes Mar 10, 2022
9fa9f8e
Revise get_deployment_import_path code
shrekris-anyscale Mar 10, 2022
de8b3e4
Merge branch 'master' of github.com:ray-project/ray into app_path
shrekris-anyscale Mar 10, 2022
4f098b7
remove test
edoakes Mar 10, 2022
004b2ec
fix
edoakes Mar 10, 2022
ff04032
Process ray actors
shrekris-anyscale Mar 10, 2022
47846e9
Remove invalid test
shrekris-anyscale Mar 10, 2022
9347460
Merge branch 'master' of github.com:ray-project/ray into app_path
shrekris-anyscale Mar 10, 2022
4e4d363
Convert to import paths on write instead of on definition
shrekris-anyscale Mar 10, 2022
622c273
Do not mutate deployments
shrekris-anyscale Mar 10, 2022
3138970
Merge branch 'serve-cli-help-polish' into build_serve
shrekris-anyscale Mar 10, 2022
26bf135
Merge branch 'app_path' into build_serve
shrekris-anyscale Mar 10, 2022
746efdc
Add skeleton for serve build
shrekris-anyscale Mar 10, 2022
4c9d77b
Replace __main__ as the module name
shrekris-anyscale Mar 10, 2022
0d2d911
Merge branch 'master' of github.com:ray-project/ray into app_path
shrekris-anyscale Mar 10, 2022
bcd9a21
Implement serve build
shrekris-anyscale Mar 10, 2022
3c3dd47
Merge branch 'app_path' into build_serve
shrekris-anyscale Mar 10, 2022
8f749b4
Make test for serve build
shrekris-anyscale Mar 10, 2022
4ba2f21
Allow deployments to be build with serve build
shrekris-anyscale Mar 10, 2022
92981a8
Remove unnecessary parameter
shrekris-anyscale Mar 10, 2022
76196f2
Fix build test
shrekris-anyscale Mar 11, 2022
beededf
Linter
shrekris-anyscale Mar 11, 2022
b2ce973
Refactor test_build to not rely on file paths
shrekris-anyscale Mar 11, 2022
fcad902
Disable test on Windows
shrekris-anyscale Mar 11, 2022
a8bb5a6
Merge branch 'master' of github.com:ray-project/ray into build_serve
shrekris-anyscale Mar 11, 2022
2e566b2
WIP
edoakes Mar 14, 2022
96c65bc
fix
edoakes Mar 14, 2022
12a1e6d
Add call graph
edoakes Mar 14, 2022
5cbd6eb
Deployment
edoakes Mar 14, 2022
50a47a9
update
edoakes Mar 14, 2022
9a1be0f
remove host/port
edoakes Mar 14, 2022
5a02571
Merge branch 'master' of github.com:ray-project/ray into build_serve
shrekris-anyscale Mar 14, 2022
d464426
Merge branch 'pipeline-interfaces' into run_public
shrekris-anyscale Mar 14, 2022
b6f5b83
Port Application implementation to API
shrekris-anyscale Mar 14, 2022
8368747
Implement serve run and deploy
shrekris-anyscale Mar 14, 2022
587cc4e
Make deploy and run public APIs
shrekris-anyscale Mar 14, 2022
08fb88a
WIP
edoakes Mar 14, 2022
30733fe
remove deploy from python
edoakes Mar 14, 2022
637fad2
Fix circular imports and update tests
shrekris-anyscale Mar 14, 2022
7eaef41
Make application pass tests
shrekris-anyscale Mar 14, 2022
c7b0797
Port application users to api
shrekris-anyscale Mar 14, 2022
e41aec5
fix
edoakes Mar 14, 2022
8de474f
remove blocking from python
edoakes Mar 14, 2022
ae8331d
remove logger
edoakes Mar 14, 2022
165b0f0
Merge branch 'pipeline-interfaces' into run_public
shrekris-anyscale Mar 14, 2022
ce284b2
update applciation interface
edoakes Mar 14, 2022
fb5b549
Update tests
shrekris-anyscale Mar 14, 2022
65afc77
properties
edoakes Mar 14, 2022
c58dc4f
Include parentheses
shrekris-anyscale Mar 14, 2022
9d03faf
fix err
edoakes Mar 14, 2022
a71e174
fix
edoakes Mar 14, 2022
0588458
Use new api
shrekris-anyscale Mar 14, 2022
fc54ed8
fix nits
edoakes Mar 14, 2022
96b8fb3
fix bad import
edoakes Mar 14, 2022
0e6658f
Merge branch 'pipeline-interfaces' into run_public
shrekris-anyscale Mar 14, 2022
022068b
Move blocking behavior to CLI
shrekris-anyscale Mar 14, 2022
64a8999
Remove application.py
shrekris-anyscale Mar 14, 2022
706290a
Add test for immutable list
shrekris-anyscale Mar 14, 2022
4453f08
Add type hint
shrekris-anyscale Mar 14, 2022
9338c04
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 14, 2022
3edcee4
Convert to dict
shrekris-anyscale Mar 15, 2022
6b4c7c5
Update error message in _add_deployment
shrekris-anyscale Mar 15, 2022
12a900f
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 15, 2022
4f15994
Remove _add_deployment
shrekris-anyscale Mar 15, 2022
bac1001
Merge branch 'master' of github.com:ray-project/ray into build_serve
shrekris-anyscale Mar 15, 2022
d1f6616
fix
edoakes Mar 15, 2022
5542e78
Remove app-level import path conversion
shrekris-anyscale Mar 15, 2022
af21886
Update build to use new API
shrekris-anyscale Mar 15, 2022
cc18a8b
Remove malformatted string
shrekris-anyscale Mar 15, 2022
e649c3e
fix
edoakes Mar 16, 2022
621e4a4
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 16, 2022
3010447
Update test_convert_to_import_path
shrekris-anyscale Mar 16, 2022
fe7a973
Merge branch 'fix-serve-run' into run_public
shrekris-anyscale Mar 16, 2022
a2df33a
Remove bad merge
shrekris-anyscale Mar 16, 2022
101f039
Implement basic DeploymentNode handling
shrekris-anyscale Mar 16, 2022
6722bf9
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 16, 2022
f1a9772
Merge branch 'run_public' into build_serve
shrekris-anyscale Mar 16, 2022
9f857cb
Restore deploy_group
shrekris-anyscale Mar 16, 2022
539a6aa
Merge branch 'run_public' into build_serve
shrekris-anyscale Mar 16, 2022
62804da
Remove repeated imports
shrekris-anyscale Mar 16, 2022
6768271
Update application import
shrekris-anyscale Mar 16, 2022
5c783e2
Merge branch 'master' of https://github.com/ray-project/ray into buil…
edoakes Mar 16, 2022
8fd364a
working
edoakes Mar 16, 2022
f9b7665
fix
edoakes Mar 16, 2022
3c28a9c
Merge branch 'master' of https://github.com/ray-project/ray into buil…
edoakes Mar 16, 2022
bac3ef5
add --app-dir
edoakes Mar 16, 2022
aee5803
Merge branch 'cli-changes' into run_public
shrekris-anyscale Mar 16, 2022
1dfc066
Make test_delete use yaml
shrekris-anyscale Mar 16, 2022
e62d87d
Remove test_run_simultaneous
shrekris-anyscale Mar 16, 2022
d9617e7
Reimplement blocking
shrekris-anyscale Mar 16, 2022
8c45b4e
Fold deploy_group into serve.run
shrekris-anyscale Mar 16, 2022
9b3d046
Make run pass tests
shrekris-anyscale Mar 16, 2022
6ab39e4
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 16, 2022
dfbd407
Remove application.py
shrekris-anyscale Mar 16, 2022
1842e06
Use delete_deployments
shrekris-anyscale Mar 16, 2022
3e5052d
Add test for runtime environments
shrekris-anyscale Mar 17, 2022
69448df
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 17, 2022
17fb3c4
Test ingress handling in serve.run()
shrekris-anyscale Mar 17, 2022
287c2d7
Remove json serialization
shrekris-anyscale Mar 17, 2022
82a5610
Update logging message
shrekris-anyscale Mar 17, 2022
88e182b
Merge branch 'run_public' of github.com:shrekris-anyscale/ray into ru…
shrekris-anyscale Mar 17, 2022
02cf933
Remove drivers.py
shrekris-anyscale Mar 17, 2022
4dd55f0
Remove implementation detail in docstring for serve.run()
shrekris-anyscale Mar 17, 2022
7089365
Fix typo
shrekris-anyscale Mar 17, 2022
5b07be2
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 17, 2022
867db5f
Restore schema helpers
shrekris-anyscale Mar 17, 2022
2c98d53
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 17, 2022
9017cff
Remove print statement
shrekris-anyscale Mar 17, 2022
d606997
Remove incomplete build implementation
shrekris-anyscale Mar 17, 2022
56af914
Make ingress a property of Application
shrekris-anyscale Mar 17, 2022
9a6011f
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 17, 2022
6dae116
Change if to elif
shrekris-anyscale Mar 17, 2022
1f9cb92
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 17, 2022
de5cde9
Get ingress in DeploymentFunctionNode case
shrekris-anyscale Mar 17, 2022
186ebf7
Remove repetitive serve.start() call
shrekris-anyscale Mar 17, 2022
cb0dab5
Make blocking a private argument for serve.run()
shrekris-anyscale Mar 17, 2022
17eb0fa
Add descriptions for host and port
shrekris-anyscale Mar 17, 2022
8016b0a
Update description for target in run
shrekris-anyscale Mar 17, 2022
7b45ec3
Update type hints for serve.run()
shrekris-anyscale Mar 18, 2022
45c66c6
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 18, 2022
08c69ba
Don't sort keys when converting to YAML in to_yaml()
shrekris-anyscale Mar 18, 2022
501d289
Merge branch 'master' of github.com:ray-project/ray into run_public
shrekris-anyscale Mar 18, 2022
73b29e6
Don't sort keys in serve config
shrekris-anyscale Mar 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions dashboard/modules/serve/serve_head.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from aiohttp.web import Request, Response
import json
import logging

import ray.dashboard.utils as dashboard_utils
import ray.dashboard.optional_utils as optional_utils

from ray import serve
from ray.serve.application import Application
from ray.serve.schema import (
serve_application_to_schema,
from ray.serve.api import (
Application,
get_deployment_statuses,
internal_get_global_client,
serve_application_status_to_schema,
)
from ray.serve.api import get_deployment_statuses

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
Expand All @@ -25,10 +26,9 @@ def __init__(self, dashboard_head):
@routes.get("/api/serve/deployments/")
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=True)
async def get_all_deployments(self, req: Request) -> Response:
deployments = list(serve.list_deployments().values())
serve_application_schema = serve_application_to_schema(deployments=deployments)
app = Application(list(serve.list_deployments().values()))
return Response(
text=serve_application_schema.json(),
text=json.dumps(app.to_dict()),
content_type="application/json",
)

Expand All @@ -53,17 +53,16 @@ async def delete_serve_application(self, req: Request) -> Response:
@optional_utils.init_ray_and_catch_exceptions(connect_to_serve=True)
async def put_all_deployments(self, req: Request) -> Response:
app = Application.from_dict(await req.json())
app.deploy(blocking=False)
serve.run(app, _blocking=False)

new_names = set()
for deployment in app:
for deployment in app.deployments.values():
new_names.add(deployment.name)

all_deployments = serve.list_deployments()
all_names = set(all_deployments.keys())
names_to_delete = all_names.difference(new_names)
for name in names_to_delete:
all_deployments[name].delete()
internal_get_global_client().delete_deployments(names_to_delete)

return Response()

Expand Down
221 changes: 189 additions & 32 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import random
import re
import time
import yaml
import json
from dataclasses import dataclass
from functools import wraps
Expand Down Expand Up @@ -64,12 +65,20 @@
format_actor_name,
get_current_node_resource_key,
get_random_letters,
get_deployment_import_path,
logger,
DEFAULT,
)
from ray.util.annotations import PublicAPI
import ray
from ray import cloudpickle
from ray.serve.schema import (
RayActorOptionsSchema,
DeploymentSchema,
DeploymentStatusSchema,
ServeApplicationSchema,
ServeApplicationStatusSchema,
)


_INTERNAL_REPLICA_CONTEXT = None
Expand Down Expand Up @@ -1673,8 +1682,9 @@ def get_deployment_statuses() -> Dict[str, DeploymentStatusInfo]:


class ImmutableDeploymentDict(dict):
def __init__(self, deployments: List[Deployment]):
raise NotImplementedError()
def __init__(self, deployments: Dict[str, Deployment]):
super().__init__()
self.update(deployments)

def __setitem__(self, *args):
"""Not allowed. Modify deployment options using set_options instead."""
Expand Down Expand Up @@ -1702,16 +1712,41 @@ class Application:
to production using the Serve CLI or REST API.
"""

def __init__(self, ingress: Deployment, deployments: List[Deployment]):
raise NotImplementedError()
def __init__(self, deployments: List[Deployment]):
deployment_dict = {}
for d in deployments:
if not isinstance(d, Deployment):
raise TypeError(f"Got {type(d)}. Expected deployment.")
elif d.name in deployment_dict:
raise ValueError(f"App got multiple deployments named '{d.name}'.")

@property
def ingress(self) -> Deployment:
raise NotImplementedError()
deployment_dict[d.name] = d

self._deployments = ImmutableDeploymentDict(deployment_dict)

@property
def deployments(self) -> ImmutableDeploymentDict:
raise NotImplementedError()
return self._deployments

@property
def ingress(self) -> Optional[Deployment]:
"""Gets the app's ingress, if one exists.

The ingress is the single deployment with a non-None route prefix. If more
or less than one deployment has a route prefix, no single ingress exists,
so returns None.
"""

ingress = None

for deployment in self._deployments.values():
if deployment.route_prefix is not None:
if ingress is None:
ingress = deployment
else:
return None

return ingress

def to_dict(self) -> Dict:
"""Returns this Application's deployments as a dictionary.
Expand All @@ -1722,7 +1757,9 @@ def to_dict(self) -> Dict:
Returns:
Dict: The Application's deployments formatted in a dictionary.
"""
raise NotImplementedError()
return ServeApplicationSchema(
deployments=[deployment_to_schema(d) for d in self._deployments.values()]
).dict()

@classmethod
def from_dict(cls, d: Dict) -> "Application":
Expand All @@ -1738,7 +1775,9 @@ def from_dict(cls, d: Dict) -> "Application":
Returns:
Application: a new application object containing the deployments.
"""
raise NotImplementedError()

schema = ServeApplicationSchema.parse_obj(d)
return cls([schema_to_deployment(s) for s in schema.deployments])

def to_yaml(self, f: Optional[TextIO] = None) -> Optional[str]:
"""Returns this application's deployments as a YAML string.
Expand All @@ -1760,7 +1799,9 @@ def to_yaml(self, f: Optional[TextIO] = None) -> Optional[str]:
Optional[String]: The deployments' YAML string. The output is from
yaml.safe_dump(). Returned only if no file pointer is passed in.
"""
raise NotImplementedError()
return yaml.safe_dump(
self.to_dict(), stream=f, default_flow_style=False, sort_keys=False
)

@classmethod
def from_yaml(cls, str_or_file: Union[str, TextIO]) -> "Application":
Expand All @@ -1784,46 +1825,54 @@ def from_yaml(cls, str_or_file: Union[str, TextIO]) -> "Application":
Serve YAML config files.

Returns:
Application: a new application object containing the deployments.
Application: a new Application object containing the deployments.
"""
raise NotImplementedError()
return cls.from_dict(yaml.safe_load(str_or_file))


@PublicAPI(stability="alpha")
def run(
target: Union[DeploymentNode, Application],
target: Union[DeploymentNode, DeploymentFunctionNode, Application],
_blocking: bool = True,
*,
host: str = DEFAULT_HTTP_HOST,
port: int = DEFAULT_HTTP_PORT,
driver: Optional[Deployment] = None,
**kwargs,
) -> RayServeHandle:
"""Run a Serve application and return a ServeHandle to the ingress.

Either a DeploymentNode or a pre-built application can be passed in.
If a DeploymentNode is passed in, all of the deployments it depends on
will be deployed.
Either a DeploymentNode, DeploymentFunctionNode, or a pre-built application
can be passed in. If a node is passed in, all of the deployments it depends
on will be deployed. If there is an ingress, its handle will be returned.

Args:
target: User built serve Application or DeploymentNode that acts as
the root node of DAG. By default DeploymentNode is the Driver
deployment unless user provided customized one.
target (Union[DeploymentNode, DeploymentFunctionNode, Application]):
A user-built Serve Application or a DeploymentNode that acts as the
root node of DAG. By default DeploymentNode is the Driver
deployment unless user provides a customized one.
host (str): The host passed into serve.start().
port (int): The port passed into serve.start().

Returns:
handle: A regular ray serve handle that can be called by user to exeucte
the serve DAG.
RayServeHandle: A regular ray serve handle that can be called by user
to execute the serve DAG.
"""
# TODO (jiaodong): Resolve circular reference in pipeline codebase and serve
from ray.serve.pipeline.api import build as pipeline_build

client = start(detached=True, http_options={"host": host, "port": port})

if isinstance(target, Application):
deployments = list(target.deployments.values())
ingress = target.ingress
# Each DAG should always provide a valid Driver DeploymentNode
if isinstance(target, (DeploymentNode)):
elif isinstance(target, DeploymentNode):
deployments = pipeline_build(target)
ingress = deployments[-1]
# Special case where user is doing single function serve.run(func.bind())
elif isinstance(target, (DeploymentFunctionNode)):
elif isinstance(target, DeploymentFunctionNode):
deployments = pipeline_build(target)
ingress = deployments[-1]
if len(deployments) != 1:
raise ValueError(
"We only support single function node in serve.run, ex: "
Expand All @@ -1839,10 +1888,16 @@ def run(
"as entrypoint to your Serve DAG."
)
else:
raise NotImplementedError()
raise TypeError(
"Expected a DeploymentNode, DeploymentFunctionNode, or "
"Application as target. Got unexpected type "
f'"{type(target)}" instead.'
)

parameter_group = []

parameter_group = [
{
for deployment in deployments:
deployment_parameters = {
"name": deployment._name,
"func_or_class": deployment._func_or_class,
"init_args": deployment.init_args,
Expand All @@ -1854,11 +1909,13 @@ def run(
"route_prefix": deployment.route_prefix,
"url": deployment.url,
}
for deployment in deployments
]

client.deploy_group(parameter_group, _blocking=True)
return deployments[-1].get_handle()
parameter_group.append(deployment_parameters)

client.deploy_group(parameter_group, _blocking=_blocking)

if ingress is not None:
return ingress.get_handle()


@PublicAPI(stability="alpha")
Expand All @@ -1880,3 +1937,103 @@ def build(target: DeploymentNode) -> Application:
# TODO(edoakes): this should accept host and port, but we don't
# currently support them in the REST API.
raise NotImplementedError()


def deployment_to_schema(d: Deployment) -> DeploymentSchema:
"""Converts a live deployment object to a corresponding structured schema.

If the deployment has a class or function, it will be attemptetd to be
converted to a valid corresponding import path.

init_args and init_kwargs must also be JSON-serializable or this call will
fail.
"""

if d.ray_actor_options is not None:
ray_actor_options_schema = RayActorOptionsSchema.parse_obj(d.ray_actor_options)
else:
ray_actor_options_schema = None

return DeploymentSchema(
name=d.name,
import_path=get_deployment_import_path(d),
init_args=d.init_args,
init_kwargs=d.init_kwargs,
num_replicas=d.num_replicas,
route_prefix=d.route_prefix,
max_concurrent_queries=d.max_concurrent_queries,
user_config=d.user_config,
autoscaling_config=d._config.autoscaling_config,
graceful_shutdown_wait_loop_s=d._config.graceful_shutdown_wait_loop_s,
graceful_shutdown_timeout_s=d._config.graceful_shutdown_timeout_s,
health_check_period_s=d._config.health_check_period_s,
health_check_timeout_s=d._config.health_check_timeout_s,
ray_actor_options=ray_actor_options_schema,
)


def schema_to_deployment(s: DeploymentSchema) -> Deployment:
from ray.serve.pipeline.json_serde import convert_from_json_safe_obj

if s.ray_actor_options is None:
ray_actor_options = None
else:
ray_actor_options = s.ray_actor_options.dict(exclude_unset=True)

return deployment(
name=s.name,
init_args=convert_from_json_safe_obj(s.init_args),
init_kwargs=convert_from_json_safe_obj(s.init_kwargs),
num_replicas=s.num_replicas,
route_prefix=s.route_prefix,
max_concurrent_queries=s.max_concurrent_queries,
user_config=convert_from_json_safe_obj(s.user_config),
_autoscaling_config=s.autoscaling_config,
_graceful_shutdown_wait_loop_s=s.graceful_shutdown_wait_loop_s,
_graceful_shutdown_timeout_s=s.graceful_shutdown_timeout_s,
_health_check_period_s=s.health_check_period_s,
_health_check_timeout_s=s.health_check_timeout_s,
ray_actor_options=ray_actor_options,
)(s.import_path)


def serve_application_to_schema(
deployments: List[Deployment],
) -> ServeApplicationSchema:
schemas = [deployment_to_schema(d) for d in deployments]
return ServeApplicationSchema(deployments=schemas)


def schema_to_serve_application(schema: ServeApplicationSchema) -> List[Deployment]:
return [schema_to_deployment(s) for s in schema.deployments]


def status_info_to_schema(
deployment_name: str, status_info: Union[DeploymentStatusInfo, Dict]
) -> DeploymentStatusSchema:
if isinstance(status_info, DeploymentStatusInfo):
return DeploymentStatusSchema(
name=deployment_name, status=status_info.status, message=status_info.message
)
elif isinstance(status_info, dict):
return DeploymentStatusSchema(
name=deployment_name,
status=status_info["status"],
message=status_info["message"],
)
else:
raise TypeError(
f"Got {type(status_info)} as status_info's "
"type. Expected status_info to be either a "
"DeploymentStatusInfo or a dictionary."
)


def serve_application_status_to_schema(
status_infos: Dict[str, Union[DeploymentStatusInfo, Dict]]
) -> ServeApplicationStatusSchema:
schemas = [
status_info_to_schema(deployment_name, status_info)
for deployment_name, status_info in status_infos.items()
]
return ServeApplicationStatusSchema(statuses=schemas)
Loading