Skip to content

Commit

Permalink
pull out shared deploy code into deploy utils (ray-project#34321)
Browse files Browse the repository at this point in the history
Signed-off-by: Cindy Zhang <[email protected]>
  • Loading branch information
zcin authored and vitsai committed Apr 17, 2023
1 parent 04c70e2 commit 825d9e3
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 129 deletions.
2 changes: 0 additions & 2 deletions python/ray/dag/input_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ def _execute_impl(self, *args, **kwargs):
current node is executed.
"""

print("self._dag_input_node", self._dag_input_node)

if isinstance(self._dag_input_node, DAGInputData):
return self._dag_input_node[self._key]
else:
Expand Down
84 changes: 6 additions & 78 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
ApplicationStatus,
DeploymentStatusInfo,
)
from ray.serve.config import DeploymentConfig, HTTPOptions, ReplicaConfig
from ray.serve.config import DeploymentConfig, HTTPOptions
from ray.serve._private.constants import (
CLIENT_POLLING_INTERVAL_S,
MAX_CACHED_HANDLES,
SERVE_NAMESPACE,
SERVE_DEFAULT_APP_NAME,
)
from ray.serve._private.deploy_utils import get_deploy_args
from ray.serve.controller import ServeController
from ray.serve.exceptions import RayServeException
from ray.serve.generated.serve_pb2 import DeploymentRoute, DeploymentRouteList
Expand Down Expand Up @@ -233,7 +234,7 @@ def deploy(
_blocking: Optional[bool] = True,
):

controller_deploy_args = self.get_deploy_args(
controller_deploy_args = get_deploy_args(
name=name,
deployment_def=deployment_def,
init_args=init_args,
Expand All @@ -253,7 +254,7 @@ def deploy(
self.log_deployment_ready(name, version, url, tag)

@_ensure_connected
def deploy_group(
def deploy_application(
self,
name,
deployments: List[Dict],
Expand All @@ -263,7 +264,7 @@ def deploy_group(
deployment_args_list = []
for deployment in deployments:
deployment_args_list.append(
self.get_deploy_args(
get_deploy_args(
deployment["name"],
deployment["func_or_class"],
deployment["init_args"],
Expand All @@ -278,7 +279,7 @@ def deploy_group(
)

updating_list = ray.get(
self._controller.deploy_group.remote(name, deployment_args_list)
self._controller.deploy_application.remote(name, deployment_args_list)
)

tags = []
Expand Down Expand Up @@ -496,79 +497,6 @@ def get_handle(

return handle

@_ensure_connected
def get_deploy_args(
self,
name: str,
deployment_def: Union[Callable, Type[Callable], str],
init_args: Tuple[Any],
init_kwargs: Dict[Any, Any],
ray_actor_options: Optional[Dict] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
route_prefix: Optional[str] = None,
is_driver_deployment: Optional[str] = None,
docs_path: Optional[str] = None,
) -> Dict:
"""
Takes a deployment's configuration, and returns the arguments needed
for the controller to deploy it.
"""

if config is None:
config = {}
if ray_actor_options is None:
ray_actor_options = {}

curr_job_env = ray.get_runtime_context().runtime_env
if "runtime_env" in ray_actor_options:
# It is illegal to set field working_dir to None.
if curr_job_env.get("working_dir") is not None:
ray_actor_options["runtime_env"].setdefault(
"working_dir", curr_job_env.get("working_dir")
)
else:
ray_actor_options["runtime_env"] = curr_job_env

replica_config = ReplicaConfig.create(
deployment_def,
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
)

if isinstance(config, dict):
deployment_config = DeploymentConfig.parse_obj(config)
elif isinstance(config, DeploymentConfig):
deployment_config = config
else:
raise TypeError("config must be a DeploymentConfig or a dictionary.")

deployment_config.version = version

if (
deployment_config.autoscaling_config is not None
and deployment_config.max_concurrent_queries
< deployment_config.autoscaling_config.target_num_ongoing_requests_per_replica # noqa: E501
):
logger.warning(
"Autoscaling will never happen, "
"because 'max_concurrent_queries' is less than "
"'target_num_ongoing_requests_per_replica' now."
)

controller_deploy_args = {
"name": name,
"deployment_config_proto_bytes": deployment_config.to_proto_bytes(),
"replica_config_proto_bytes": replica_config.to_proto_bytes(),
"route_prefix": route_prefix,
"deployer_job_id": ray.get_runtime_context().get_job_id(),
"is_driver_deployment": is_driver_deployment,
"docs_path": docs_path,
}

return controller_deploy_args

@_ensure_connected
def log_deployment_update_status(
self, name: str, version: str, updating: bool
Expand Down
137 changes: 137 additions & 0 deletions python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from typing import Dict, Tuple, Union, Callable, Type, Optional, Any
import logging
import time

from ray.serve.config import ReplicaConfig, DeploymentConfig
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve._private.common import DeploymentInfo

import ray
import ray.util.serialization_addons

logger = logging.getLogger(SERVE_LOGGER_NAME)


def get_deploy_args(
name: str,
deployment_def: Union[Callable, Type[Callable], str],
init_args: Tuple[Any],
init_kwargs: Dict[Any, Any],
ray_actor_options: Optional[Dict] = None,
config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
route_prefix: Optional[str] = None,
is_driver_deployment: Optional[str] = None,
docs_path: Optional[str] = None,
) -> Dict:
"""
Takes a deployment's configuration, and returns the arguments needed
for the controller to deploy it.
"""

if config is None:
config = {}
if ray_actor_options is None:
ray_actor_options = {}

curr_job_env = ray.get_runtime_context().runtime_env
if "runtime_env" in ray_actor_options:
# It is illegal to set field working_dir to None.
if curr_job_env.get("working_dir") is not None:
ray_actor_options["runtime_env"].setdefault(
"working_dir", curr_job_env.get("working_dir")
)
else:
ray_actor_options["runtime_env"] = curr_job_env

replica_config = ReplicaConfig.create(
deployment_def,
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
)

if isinstance(config, dict):
deployment_config = DeploymentConfig.parse_obj(config)
elif isinstance(config, DeploymentConfig):
deployment_config = config
else:
raise TypeError("config must be a DeploymentConfig or a dictionary.")

deployment_config.version = version

if (
deployment_config.autoscaling_config is not None
and deployment_config.max_concurrent_queries
< deployment_config.autoscaling_config.target_num_ongoing_requests_per_replica # noqa: E501
):
logger.warning(
"Autoscaling will never happen, "
"because 'max_concurrent_queries' is less than "
"'target_num_ongoing_requests_per_replica' now."
)

controller_deploy_args = {
"name": name,
"deployment_config_proto_bytes": deployment_config.to_proto_bytes(),
"replica_config_proto_bytes": replica_config.to_proto_bytes(),
"route_prefix": route_prefix,
"deployer_job_id": ray.get_runtime_context().get_job_id(),
"is_driver_deployment": is_driver_deployment,
"docs_path": docs_path,
}

return controller_deploy_args


def deploy_args_to_deployment_info(
deployment_name: str,
deployment_config_proto_bytes: bytes,
replica_config_proto_bytes: bytes,
deployer_job_id: Union[str, bytes],
previous_deployment: DeploymentInfo,
is_driver_deployment: Optional[bool] = False,
) -> DeploymentInfo:
"""Takes deployment args passed to the controller after building an application and
constructs a DeploymentInfo object.
"""

deployment_config = DeploymentConfig.from_proto_bytes(deployment_config_proto_bytes)
version = deployment_config.version
replica_config = ReplicaConfig.from_proto_bytes(
replica_config_proto_bytes, deployment_config.needs_pickle()
)

autoscaling_config = deployment_config.autoscaling_config
if autoscaling_config is not None:
if autoscaling_config.initial_replicas is not None:
deployment_config.num_replicas = autoscaling_config.initial_replicas
else:
if previous_deployment is None:
deployment_config.num_replicas = autoscaling_config.min_replicas
else:
deployment_config.num_replicas = (
previous_deployment.deployment_config.num_replicas
)

autoscaling_policy = BasicAutoscalingPolicy(autoscaling_config)
else:
autoscaling_policy = None

# Java API passes in JobID as bytes
if isinstance(deployer_job_id, bytes):
deployer_job_id = ray.JobID.from_int(
int.from_bytes(deployer_job_id, "little")
).hex()

return DeploymentInfo(
actor_name=deployment_name,
version=version,
deployment_config=deployment_config,
replica_config=replica_config,
deployer_job_id=deployer_job_id,
start_time_ms=int(time.time() * 1000),
autoscaling_policy=autoscaling_policy,
is_driver_deployment=is_driver_deployment,
)
2 changes: 1 addition & 1 deletion python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ def run(
"docs_path": deployment._docs_path,
}
parameter_group.append(deployment_parameters)
client.deploy_group(
client.deploy_application(
name,
parameter_group,
_blocking=_blocking,
Expand Down
54 changes: 13 additions & 41 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
from ray.actor import ActorHandle
from ray._raylet import GcsClient
from ray.serve._private.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve._private.common import (
DeploymentInfo,
EndpointInfo,
Expand All @@ -25,7 +24,7 @@
StatusOverview,
ServeDeployMode,
)
from ray.serve.config import DeploymentConfig, HTTPOptions, ReplicaConfig
from ray.serve.config import HTTPOptions
from ray.serve._private.constants import (
CONTROL_LOOP_PERIOD_S,
SERVE_LOGGER_NAME,
Expand All @@ -37,6 +36,7 @@
DEPLOYMENT_NAME_PREFIX_SEPARATOR,
MULTI_APP_MIGRATION_MESSAGE,
)
from ray.serve._private.deploy_utils import deploy_args_to_deployment_info
from ray.serve._private.deployment_state import DeploymentStateManager, ReplicaState
from ray.serve._private.endpoint_state import EndpointState
from ray.serve._private.http_state import HTTPState
Expand Down Expand Up @@ -384,52 +384,22 @@ def deploy(
docs_path: Optional[str] = None,
is_driver_deployment: Optional[bool] = False,
) -> bool:
"""Deploys a deployment."""

if route_prefix is not None:
assert route_prefix.startswith("/")
if docs_path is not None:
assert docs_path.startswith("/")

deployment_config = DeploymentConfig.from_proto_bytes(
deployment_config_proto_bytes
)
version = deployment_config.version
replica_config = ReplicaConfig.from_proto_bytes(
replica_config_proto_bytes, deployment_config.needs_pickle()
)

autoscaling_config = deployment_config.autoscaling_config
if autoscaling_config is not None:
if autoscaling_config.initial_replicas is not None:
deployment_config.num_replicas = autoscaling_config.initial_replicas
else:
previous_deployment = self.deployment_state_manager.get_deployment(name)
if previous_deployment is None:
deployment_config.num_replicas = autoscaling_config.min_replicas
else:
deployment_config.num_replicas = (
previous_deployment.deployment_config.num_replicas
)

autoscaling_policy = BasicAutoscalingPolicy(autoscaling_config)
else:
autoscaling_policy = None

# Java API passes in JobID as bytes
if isinstance(deployer_job_id, bytes):
deployer_job_id = ray.JobID.from_int(
int.from_bytes(deployer_job_id, "little")
).hex()

deployment_info = DeploymentInfo(
actor_name=name,
version=version,
deployment_config=deployment_config,
replica_config=replica_config,
deployment_info = deploy_args_to_deployment_info(
deployment_name=name,
deployment_config_proto_bytes=deployment_config_proto_bytes,
replica_config_proto_bytes=replica_config_proto_bytes,
deployer_job_id=deployer_job_id,
start_time_ms=int(time.time() * 1000),
autoscaling_policy=autoscaling_policy,
previous_deployment=self.deployment_state_manager.get_deployment(name),
is_driver_deployment=is_driver_deployment,
)

# TODO(architkulkarni): When a deployment is redeployed, even if
# the only change was num_replicas, the start_time_ms is refreshed.
# Is this the desired behaviour?
Expand All @@ -443,7 +413,9 @@ def deploy(

return updating

def deploy_group(self, name: str, deployment_args_list: List[Dict]) -> List[bool]:
def deploy_application(
self, name: str, deployment_args_list: List[Dict]
) -> List[bool]:
"""
Takes in a list of dictionaries that contain keyword arguments for the
controller's deploy() function. Calls deploy on all the argument
Expand Down
Loading

0 comments on commit 825d9e3

Please sign in to comment.