From f0071d30fb3758ef7f382f846b586258225af856 Mon Sep 17 00:00:00 2001 From: Jiao Date: Thu, 21 Apr 2022 11:48:48 -0700 Subject: [PATCH] [Serve][Deployment Graph] Let .bind return ray DAGNode types and remove exposing DeploymentNode as public (#24065) See dag layering summary in https://github.com/ray-project/ray/issues/24061 We need to cleanup and set right ray dag -> serve dag layering where `.bind()` can be called on `@serve.deployment` decorated class or func, but only returns raw Ray DAGNode type, executable by ray core and serve_dag is only available after serve-specific transformations. Thus this PR removes exposed serve DAGNode type such as DeploymentNode. It also removes the syntax of `class.bind().bind()` to return a `DeploymentMethodNode` that defaults to `__call__` to match same behavior in ray dag building. --- python/ray/experimental/dag/class_node.py | 3 + python/ray/experimental/dag/dag_node.py | 5 +- python/ray/experimental/dag/py_obj_scanner.py | 6 -- .../experimental/dag/tests/test_class_dag.py | 10 ++-- .../dag/tests/test_function_dag.py | 6 +- python/ray/serve/api.py | 31 +++++----- python/ray/serve/deployment.py | 12 ++-- python/ray/serve/deployment_graph.py | 60 +++---------------- python/ray/serve/pipeline/api.py | 1 + python/ray/serve/scripts.py | 16 ++--- python/ray/serve/tests/test_pipeline_dag.py | 20 +++---- 11 files changed, 61 insertions(+), 109 deletions(-) diff --git a/python/ray/experimental/dag/class_node.py b/python/ray/experimental/dag/class_node.py index a6d9c26efc16..b6e63febc6ff 100644 --- a/python/ray/experimental/dag/class_node.py +++ b/python/ray/experimental/dag/class_node.py @@ -79,6 +79,9 @@ def _contains_input_node(self) -> bool: return False def __getattr__(self, method_name: str): + # User trying to call .bind() without a bind class method + if method_name == "bind" and "bind" not in dir(self._body): + raise AttributeError(f".bind() cannot be used again on {type(self)} ") # Raise an error if the method is invalid. getattr(self._body, method_name) call_node = _UnboundClassMethodNode(self, method_name) diff --git a/python/ray/experimental/dag/dag_node.py b/python/ray/experimental/dag/dag_node.py index 4b7000f3eb8d..da13a0decbfb 100644 --- a/python/ray/experimental/dag/dag_node.py +++ b/python/ray/experimental/dag/dag_node.py @@ -278,10 +278,7 @@ def __reduce__(self): def __getattr__(self, attr: str): if attr == "bind": - raise AttributeError( - f".bind() cannot be used again on {type(self)} " - f"(args: {self.get_args()}, kwargs: {self.get_kwargs()})." - ) + raise AttributeError(f".bind() cannot be used again on {type(self)} ") elif attr == "remote": raise AttributeError( f".remote() cannot be used on {type(self)}. To execute the task " diff --git a/python/ray/experimental/dag/py_obj_scanner.py b/python/ray/experimental/dag/py_obj_scanner.py index 09a168a7255e..247585617b78 100644 --- a/python/ray/experimental/dag/py_obj_scanner.py +++ b/python/ray/experimental/dag/py_obj_scanner.py @@ -49,10 +49,6 @@ def __init__(self): from ray.serve.pipeline.deployment_node import DeploymentNode from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode from ray.serve.pipeline.deployment_function_node import DeploymentFunctionNode - from ray.serve.deployment_graph import DeploymentNode as UserDeploymentNode - from ray.serve.deployment_graph import ( - DeploymentFunctionNode as UserDeploymentFunctionNode, - ) self.dispatch_table[FunctionNode] = self._reduce_dag_node self.dispatch_table[ClassNode] = self._reduce_dag_node @@ -62,8 +58,6 @@ def __init__(self): self.dispatch_table[DeploymentNode] = self._reduce_dag_node self.dispatch_table[DeploymentMethodNode] = self._reduce_dag_node self.dispatch_table[DeploymentFunctionNode] = self._reduce_dag_node - self.dispatch_table[UserDeploymentNode] = self._reduce_dag_node - self.dispatch_table[UserDeploymentFunctionNode] = self._reduce_dag_node super().__init__(self._buf) def find_nodes(self, obj: Any) -> List["DAGNode"]: diff --git a/python/ray/experimental/dag/tests/test_class_dag.py b/python/ray/experimental/dag/tests/test_class_dag.py index fb0493516dbb..bdd1b9f5d951 100644 --- a/python/ray/experimental/dag/tests/test_class_dag.py +++ b/python/ray/experimental/dag/tests/test_class_dag.py @@ -114,9 +114,11 @@ def test_actor_method_options(shared_ray_instance): def test_basic_actor_dag_constructor_invalid_options(shared_ray_instance): - a1 = Actor.options(num_cpus=-1).bind(10) - invalid_dag = a1.get.bind() - with pytest.raises(ValueError, match=".*Resource quantities may not be negative.*"): + with pytest.raises( + ValueError, match=r".*only accepts None, 0 or a positive number.*" + ): + a1 = Actor.options(num_cpus=-1).bind(10) + invalid_dag = a1.get.bind() ray.get(invalid_dag.execute()) @@ -248,7 +250,7 @@ def ping(self): with pytest.raises( AttributeError, - match="'Actor' has no attribute 'bind'", + match=r"\.bind\(\) cannot be used again on", ): actor = Actor.bind() _ = actor.bind() diff --git a/python/ray/experimental/dag/tests/test_function_dag.py b/python/ray/experimental/dag/tests/test_function_dag.py index 37d4814121e8..02bd4c0fd8f8 100644 --- a/python/ray/experimental/dag/tests/test_function_dag.py +++ b/python/ray/experimental/dag/tests/test_function_dag.py @@ -111,8 +111,10 @@ def b(x): # Ensure current DAG is executable assert ray.get(dag.execute()) == 4 - invalid_dag = b.options(num_cpus=-1).bind(a_ref) - with pytest.raises(ValueError, match=".*Resource quantities may not be negative.*"): + with pytest.raises( + ValueError, match=r".*only accepts None, 0 or a positive number.*" + ): + invalid_dag = b.options(num_cpus=-1).bind(a_ref) ray.get(invalid_dag.execute()) diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 149609b04897..4a3693f6aa79 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -72,7 +72,7 @@ from ray.util.annotations import PublicAPI import ray from ray import cloudpickle -from ray.serve.deployment_graph import DeploymentNode, DeploymentFunctionNode +from ray.serve.deployment_graph import ClassNode, FunctionNode from ray.serve.application import Application logger = logging.getLogger(__file__) @@ -1246,7 +1246,7 @@ def get_deployment_statuses() -> Dict[str, DeploymentStatusInfo]: @PublicAPI(stability="alpha") def run( - target: Union[DeploymentNode, DeploymentFunctionNode], + target: Union[ClassNode, FunctionNode], _blocking: bool = True, *, host: str = DEFAULT_HTTP_HOST, @@ -1254,14 +1254,14 @@ def run( ) -> Optional[RayServeHandle]: """Run a Serve application and return a ServeHandle to the ingress. - Either a DeploymentNode, DeploymentFunctionNode, or a pre-built application + Either a ClassNode, FunctionNode, or a pre-built application can be passed in. If a node is passed in, all of the deployments it depends on will be deployed. If there is an ingress, its handle will be returned. Args: - target (Union[DeploymentNode, DeploymentFunctionNode, Application]): - A user-built Serve Application or a DeploymentNode that acts as the - root node of DAG. By default DeploymentNode is the Driver + target (Union[ClassNode, FunctionNode, Application]): + A user-built Serve Application or a ClassNode that acts as the + root node of DAG. By default ClassNode is the Driver deployment unless user provides a customized one. host (str): The host passed into serve.start(). port (int): The port passed into serve.start(). @@ -1279,12 +1279,12 @@ def run( if isinstance(target, Application): deployments = list(target.deployments.values()) ingress = target.ingress - # Each DAG should always provide a valid Driver DeploymentNode - elif isinstance(target, DeploymentNode): + # Each DAG should always provide a valid Driver ClassNode + elif isinstance(target, ClassNode): deployments = pipeline_build(target) ingress = get_and_validate_ingress_deployment(deployments) # Special case where user is doing single function serve.run(func.bind()) - elif isinstance(target, DeploymentFunctionNode): + elif isinstance(target, FunctionNode): deployments = pipeline_build(target) ingress = get_and_validate_ingress_deployment(deployments) if len(deployments) != 1: @@ -1297,15 +1297,14 @@ def run( elif isinstance(target, DAGNode): raise ValueError( "Invalid DAGNode type as entry to serve.run(), " - f"type: {type(target)}, accepted: DeploymentNode, " - "DeploymentFunctionNode please provide a driver class and bind it " + f"type: {type(target)}, accepted: ClassNode, " + "FunctionNode please provide a driver class and bind it " "as entrypoint to your Serve DAG." ) else: raise TypeError( - "Expected a DeploymentNode, DeploymentFunctionNode, or " - "Application as target. Got unexpected type " - f'"{type(target)}" instead.' + "Expected a ClassNode, FunctionNode, or Application as target. " + f"Got unexpected type {type(target)} instead." ) parameter_group = [] @@ -1332,10 +1331,10 @@ def run( return ingress.get_handle() -def build(target: Union[DeploymentNode, DeploymentFunctionNode]) -> Application: +def build(target: Union[ClassNode, FunctionNode]) -> Application: """Builds a Serve application into a static application. - Takes in a DeploymentNode or DeploymentFunctionNode and converts it to a + Takes in a ClassNode or FunctionNode and converts it to a Serve application consisting of one or more deployments. This is intended to be used for production scenarios and deployed via the Serve REST API or CLI, so there are some restrictions placed on the deployments: diff --git a/python/ray/serve/deployment.py b/python/ray/serve/deployment.py index b994c709d344..c2f83f67f6b5 100644 --- a/python/ray/serve/deployment.py +++ b/python/ray/serve/deployment.py @@ -8,13 +8,13 @@ Tuple, Union, ) - +from ray.experimental.dag.class_node import ClassNode +from ray.experimental.dag.function_node import FunctionNode from ray.serve.config import ( AutoscalingConfig, DeploymentConfig, ) from ray.serve.handle import RayServeHandle, RayServeSyncHandle -from ray.serve.deployment_graph import DeploymentNode, DeploymentFunctionNode from ray.serve.utils import DEFAULT, get_deployment_import_path from ray.util.annotations import PublicAPI from ray.serve.schema import ( @@ -186,8 +186,8 @@ def __call__(self): ) @PublicAPI(stability="alpha") - def bind(self, *args, **kwargs) -> Union[DeploymentNode, DeploymentFunctionNode]: - """Bind the provided arguments and return a DeploymentNode. + def bind(self, *args, **kwargs) -> Union[ClassNode, FunctionNode]: + """Bind the provided arguments and return a class or function node. The returned bound deployment can be deployed or bound to other deployments to create a deployment graph. @@ -200,7 +200,7 @@ def bind(self, *args, **kwargs) -> Union[DeploymentNode, DeploymentFunctionNode] schema_shell = deployment_to_schema(copied_self) if inspect.isfunction(self._func_or_class): - return DeploymentFunctionNode( + return FunctionNode( self._func_or_class, args, # Used to bind and resolve DAG only, can take user input kwargs, # Used to bind and resolve DAG only, can take user input @@ -211,7 +211,7 @@ def bind(self, *args, **kwargs) -> Union[DeploymentNode, DeploymentFunctionNode] }, ) else: - return DeploymentNode( + return ClassNode( self._func_or_class, args, kwargs, diff --git a/python/ray/serve/deployment_graph.py b/python/ray/serve/deployment_graph.py index 59fcde033532..634399d3603f 100644 --- a/python/ray/serve/deployment_graph.py +++ b/python/ray/serve/deployment_graph.py @@ -1,7 +1,8 @@ import json -from ray.experimental.dag.class_node import ClassNode -from ray.experimental.dag.function_node import FunctionNode -from ray.experimental.dag import DAGNode +from ray.experimental.dag.class_node import ClassNode # noqa: F401 +from ray.experimental.dag.function_node import FunctionNode # noqa: F401 +from ray.experimental.dag.input_node import InputNode # noqa: F401 +from ray.experimental.dag import DAGNode # noqa: F401 from ray.util.annotations import PublicAPI @@ -14,7 +15,9 @@ class RayServeDAGHandle: """ def __init__(self, dag_node_json: str) -> None: + from ray.serve.pipeline.json_serde import dagnode_from_json + self.dagnode_from_json = dagnode_from_json self.dag_node_json = dag_node_json # NOTE(simon): Making this lazy to avoid deserialization in controller for now @@ -31,57 +34,8 @@ def __reduce__(self): return RayServeDAGHandle._deserialize, (self.dag_node_json,) def remote(self, *args, **kwargs): - from ray.serve.pipeline.json_serde import dagnode_from_json - if self.dag_node is None: self.dag_node = json.loads( - self.dag_node_json, object_hook=dagnode_from_json + self.dag_node_json, object_hook=self.dagnode_from_json ) return self.dag_node.execute(*args, **kwargs) - - -@PublicAPI(stability="alpha") -class DeploymentMethodNode(DAGNode): - """Represents a method call on a bound deployment node. - - These method calls can be composed into an optimized call DAG and passed - to a "driver" deployment that will orchestrate the calls at runtime. - - This class cannot be called directly. Instead, when it is bound to a - deployment node, it will be resolved to a DeployedCallGraph at runtime. - """ - - # TODO (jiaodong): Later unify and refactor this with pipeline node class - pass - - -@PublicAPI(stability="alpha") -class DeploymentNode(ClassNode): - """Represents a deployment with its bound config options and arguments. - - The bound deployment can be run using serve.run(). - - A bound deployment can be passed as an argument to other bound deployments - to build a deployment graph. When the graph is deployed, the - bound deployments passed into a constructor will be converted to - RayServeHandles that can be used to send requests. - - Calling deployment.method.bind() will return a DeploymentMethodNode - that can be used to compose an optimized call graph. - """ - - # TODO (jiaodong): Later unify and refactor this with pipeline node class - def bind(self, *args, **kwargs): - """Bind the default __call__ method and return a DeploymentMethodNode""" - return self.__call__.bind(*args, **kwargs) - - -@PublicAPI(stability="alpha") -class DeploymentFunctionNode(FunctionNode): - """Represents a serve.deployment decorated function from user. - - It's the counterpart of DeploymentNode that represents function as body - instead of class. - """ - - pass diff --git a/python/ray/serve/pipeline/api.py b/python/ray/serve/pipeline/api.py index 12749aae646c..cc55375ffac9 100644 --- a/python/ray/serve/pipeline/api.py +++ b/python/ray/serve/pipeline/api.py @@ -1,4 +1,5 @@ from typing import List + from ray.experimental.dag.dag_node import DAGNode from ray.serve.pipeline.generate import ( transform_ray_dag_to_serve_dag, diff --git a/python/ray/serve/scripts.py b/python/ray/serve/scripts.py index 6d05e84cae3c..004b4c9e5814 100644 --- a/python/ray/serve/scripts.py +++ b/python/ray/serve/scripts.py @@ -24,8 +24,8 @@ from ray.serve.api import build as build_app from ray.serve.api import Application from ray.serve.deployment_graph import ( - DeploymentFunctionNode, - DeploymentNode, + FunctionNode, + ClassNode, ) APP_DIR_HELP_STR = ( @@ -186,7 +186,7 @@ def deploy(config_file_name: str, address: str): short_help="Run a Serve app.", help=( "Runs the Serve app from the specified import path or YAML config.\n" - "Any import path must lead to an Application or DeploymentNode object. " + "Any import path must lead to an Application or ClassNode object. " "By default, this will block and periodically log status. If you " "Ctrl-C the command, it will tear down the app." ), @@ -385,7 +385,7 @@ def delete(address: str, yes: bool): @cli.command( short_help="Writes a Pipeline's config file.", help=( - "Imports the DeploymentNode or DeploymentFunctionNode at IMPORT_PATH " + "Imports the ClassNode or FunctionNode at IMPORT_PATH " "and generates a structured config for it that can be used by " "`serve deploy` or the REST API. " ), @@ -412,11 +412,11 @@ def delete(address: str, yes: bool): def build(app_dir: str, output_path: Optional[str], import_path: str): sys.path.insert(0, app_dir) - node: Union[DeploymentNode, DeploymentFunctionNode] = import_attr(import_path) - if not isinstance(node, (DeploymentNode, DeploymentFunctionNode)): + node: Union[ClassNode, FunctionNode] = import_attr(import_path) + if not isinstance(node, (ClassNode, FunctionNode)): raise TypeError( - f"Expected '{import_path}' to be DeploymentNode or " - f"DeploymentFunctionNode, but got {type(node)}." + f"Expected '{import_path}' to be ClassNode or " + f"FunctionNode, but got {type(node)}." ) app = build_app(node) diff --git a/python/ray/serve/tests/test_pipeline_dag.py b/python/ray/serve/tests/test_pipeline_dag.py index f9e98b867b9a..b4e3ca2ec4bc 100644 --- a/python/ray/serve/tests/test_pipeline_dag.py +++ b/python/ray/serve/tests/test_pipeline_dag.py @@ -8,11 +8,11 @@ import ray from ray import serve -from ray.experimental.dag.input_node import InputNode from ray.serve.application import Application from ray.serve.api import build as build_app -from ray.serve.deployment_graph import DeploymentNode, RayServeDAGHandle +from ray.serve.deployment_graph import RayServeDAGHandle from ray.serve.pipeline.api import build as pipeline_build +from ray.serve.deployment_graph import ClassNode, InputNode from ray.serve.drivers import DAGDriver import starlette.requests @@ -21,9 +21,7 @@ NESTED_HANDLE_KEY = "nested_handle" -def maybe_build( - node: DeploymentNode, use_build: bool -) -> Union[Application, DeploymentNode]: +def maybe_build(node: ClassNode, use_build: bool) -> Union[Application, ClassNode]: if use_build: return Application.from_dict(build_app(node).to_dict()) else: @@ -202,7 +200,7 @@ def test_multi_instantiation_class_deployment_in_init_args(serve_instance, use_b m1 = Model.bind(2) m2 = Model.bind(3) combine = Combine.bind(m1, m2=m2) - combine_output = combine.bind(dag_input) + combine_output = combine.__call__.bind(dag_input) serve_dag = DAGDriver.bind(combine_output, input_schema=json_resolver) handle = serve.run(serve_dag) @@ -215,7 +213,7 @@ def test_shared_deployment_handle(serve_instance, use_build): with InputNode() as dag_input: m = Model.bind(2) combine = Combine.bind(m, m2=m) - combine_output = combine.bind(dag_input) + combine_output = combine.__call__.bind(dag_input) serve_dag = DAGDriver.bind(combine_output, input_schema=json_resolver) handle = serve.run(serve_dag) @@ -229,7 +227,7 @@ def test_multi_instantiation_class_nested_deployment_arg_dag(serve_instance, use m1 = Model.bind(2) m2 = Model.bind(3) combine = Combine.bind(m1, m2={NESTED_HANDLE_KEY: m2}, m2_nested=True) - output = combine.bind(dag_input) + output = combine.__call__.bind(dag_input) serve_dag = DAGDriver.bind(output, input_schema=json_resolver) handle = serve.run(serve_dag) @@ -418,8 +416,10 @@ def ping(self): return "hello" with pytest.raises(AttributeError, match=r"\.bind\(\) cannot be used again on"): - # Special for serve: Actor.bind().bind() returns DeploymentMethodNode - _ = Actor.bind().bind().bind() + _ = Actor.bind().bind() + + with pytest.raises(AttributeError, match=r"\.bind\(\) cannot be used again on"): + _ = Actor.bind().ping.bind().bind() with pytest.raises( AttributeError,