Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Avoid deleting dynamically-deployed apps when applying config via REST API #44476

Merged
merged 7 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 98 additions & 40 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class BuildAppStatus(Enum):
FAILED = 4


class APIType(Enum):
"""Tracks the type of API that an application originates from."""

UNKNOWN = 0
IMPERATIVE = 1
DECLARATIVE = 2


@dataclass
class BuildAppTaskInfo:
"""Stores info on the current in-progress build app task.
Expand Down Expand Up @@ -107,6 +115,7 @@ class ApplicationTargetState:
target_capacity: Optional[float]
target_capacity_direction: Optional[TargetCapacityDirection]
deleting: bool
api_type: APIType


class ApplicationState:
Expand Down Expand Up @@ -153,6 +162,7 @@ def __init__(
target_capacity=None,
target_capacity_direction=None,
deleting=False,
api_type=APIType.UNKNOWN,
)
self._save_checkpoint_func = save_checkpoint_func

Expand Down Expand Up @@ -194,6 +204,10 @@ def target_deployments(self) -> List[str]:
def ingress_deployment(self) -> Optional[str]:
return self._ingress_deployment_name

@property
def api_type(self) -> APIType:
return self._target_state.api_type

def recover_target_state_from_checkpoint(
self, checkpoint_data: ApplicationTargetState
):
Expand All @@ -202,16 +216,19 @@ def recover_target_state_from_checkpoint(
)
self._set_target_state(
checkpoint_data.deployment_infos,
checkpoint_data.code_version,
checkpoint_data.config,
checkpoint_data.target_capacity,
checkpoint_data.target_capacity_direction,
checkpoint_data.deleting,
api_type=checkpoint_data.api_type,
code_version=checkpoint_data.code_version,
target_config=checkpoint_data.config,
target_capacity=checkpoint_data.target_capacity,
target_capacity_direction=checkpoint_data.target_capacity_direction,
deleting=checkpoint_data.deleting,
)

def _set_target_state(
self,
deployment_infos: Optional[Dict[str, DeploymentInfo]],
*,
api_type: APIType,
code_version: str,
target_config: Optional[ServeApplicationSchema],
target_capacity: Optional[float] = None,
Expand All @@ -227,7 +244,6 @@ def _set_target_state(
When a request to delete the application has been received, this should be
({}, True)
"""

if deleting:
self._update_status(ApplicationStatus.DELETING)
else:
Expand All @@ -247,6 +263,7 @@ def _set_target_state(
target_capacity,
target_capacity_direction,
deleting,
api_type=api_type,
)

# Checkpoint ahead, so that if the controller crashes before we
Expand All @@ -261,14 +278,26 @@ def _set_target_state_deleting(self):

Wipes the target deployment infos, code version, and config.
"""
self._set_target_state(dict(), None, None, None, None, True)
self._set_target_state(
deployment_infos=dict(),
api_type=self._target_state.api_type,
code_version=None,
target_config=None,
deleting=True,
)

def _clear_target_state_and_store_config(
self, target_config: Optional[ServeApplicationSchema]
self,
target_config: Optional[ServeApplicationSchema],
):
"""Clears the target state and stores the config."""
edoakes marked this conversation as resolved.
Show resolved Hide resolved

self._set_target_state(None, None, target_config, None, None, False)
self._set_target_state(
deployment_infos=None,
api_type=APIType.DECLARATIVE,
code_version=None,
target_config=target_config,
deleting=False,
)

def _delete_deployment(self, name):
id = DeploymentID(name=name, app_name=self._name)
Expand All @@ -279,7 +308,7 @@ def delete(self):
"""Delete the application"""
if self._status != ApplicationStatus.DELETING:
logger.info(
f"Deleting application '{self._name}'",
f"Deleting app '{self._name}'.",
extra={"log_to_stderr": False},
)
self._set_target_state_deleting()
Expand Down Expand Up @@ -327,11 +356,11 @@ def apply_deployment_info(
else:
self._endpoint_state.delete_endpoint(deployment_id)

def deploy(self, deployment_infos: Dict[str, DeploymentInfo]):
"""Deploy application from list of deployment infos.
def deploy_app(self, deployment_infos: Dict[str, DeploymentInfo]):
"""(Re-)deploy the application from list of deployment infos.

This function should only be called if the app is being deployed
through serve.run instead of from a config.
This function should only be called to deploy an app from an
imperative API (i.e., `serve.run` or Java API).

Raises: RayServeException if there is more than one route prefix
or docs path.
Expand All @@ -342,25 +371,29 @@ def deploy(self, deployment_infos: Dict[str, DeploymentInfo]):

self._set_target_state(
deployment_infos=deployment_infos,
api_type=APIType.IMPERATIVE,
code_version=None,
target_config=None,
target_capacity=None,
target_capacity_direction=None,
)

def deploy_config(
def apply_app_config(
self,
config: ServeApplicationSchema,
target_capacity: Optional[float],
target_capacity_direction: Optional[TargetCapacityDirection],
deployment_time: int,
) -> None:
"""Deploys an application config.
"""Apply the config to the application.

If the code version matches that of the current live deployments
then it only applies the updated config to the deployment state
manager. If the code version doesn't match, this will re-build
the application.

This function should only be called to (re-)deploy an app from
the declarative API (i.e., through the REST API).
"""

self._deployment_timestamp = deployment_time
Expand All @@ -377,6 +410,7 @@ def deploy_config(
self._set_target_state(
# Code version doesn't change.
code_version=self._target_state.code_version,
api_type=APIType.DECLARATIVE,
# Everything else must reflect the new config.
deployment_infos=overrided_infos,
target_config=config,
Expand Down Expand Up @@ -415,7 +449,7 @@ def deploy_config(
ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1")

# Kick off new build app task
logger.info(f"Building application '{self._name}'.")
logger.info(f"Importing and building app '{self._name}'.")
build_app_obj_ref = build_serve_application.options(
runtime_env=config.runtime_env,
enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
Expand Down Expand Up @@ -517,7 +551,7 @@ def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]:
try:
args, err = ray.get(self._build_app_task_info.obj_ref)
if err is None:
logger.info(f"Built application '{self._name}' successfully.")
logger.info(f"Imported and built app '{self._name}' successfully.")
else:
return (
None,
Expand Down Expand Up @@ -658,6 +692,7 @@ def update(self) -> bool:
self._set_target_state(
deployment_infos=infos,
code_version=self._build_app_task_info.code_version,
api_type=self._target_state.api_type,
target_config=self._build_app_task_info.config,
target_capacity=self._build_app_task_info.target_capacity,
target_capacity_direction=(
Expand Down Expand Up @@ -762,14 +797,14 @@ def _recover_from_checkpoint(self):
app_state.recover_target_state_from_checkpoint(checkpoint_data)
self._application_states[app_name] = app_state

def delete_application(self, name: str) -> None:
def delete_app(self, name: str) -> None:
"""Delete application by name"""
if name not in self._application_states:
return
self._application_states[name].delete()

def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None:
"""Apply list of deployment arguments to application target state.
def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
"""Deploy the specified app to the list of deployment arguments.
edoakes marked this conversation as resolved.
Show resolved Hide resolved

This function should only be called if the app is being deployed
through serve.run instead of from a config.
Expand Down Expand Up @@ -816,32 +851,55 @@ def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None:
)
for params in deployment_args
}
self._application_states[name].deploy(deployment_infos)
self._application_states[name].deploy_app(deployment_infos)

def deploy_config(
def apply_app_configs(
edoakes marked this conversation as resolved.
Show resolved Hide resolved
self,
name: str,
app_config: ServeApplicationSchema,
app_configs: List[ServeApplicationSchema],
*,
deployment_time: float = 0,
target_capacity: Optional[float] = None,
target_capacity_direction: Optional[TargetCapacityDirection] = None,
) -> None:
"""Deploy application from config."""
):
"""Declaratively apply the list of application configs.

if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
The applications will be reconciled to match the target state of the config.

Any applications previously deployed through this method that are *not*
edoakes marked this conversation as resolved.
Show resolved Hide resolved
present in the list will be deleted.
"""
# Track all existing apps and pop those that are in the newly applied config.
existing_apps = {
name
for name, app_state in self._application_states.items()
if app_state.api_type == APIType.DECLARATIVE
}

for app_config in app_configs:
if app_config.name not in existing_apps:
logger.info(f"Deploying new app '{app_config.name}'.")
self._application_states[app_config.name] = ApplicationState(
app_config.name,
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this makes sense/would be helpful, but maybe we can add a log statement for when an app switches from imperative to declarative or vice versa when redeployed?

else:
existing_apps.remove(app_config.name)
edoakes marked this conversation as resolved.
Show resolved Hide resolved

self._application_states[app_config.name].apply_app_config(
app_config,
target_capacity,
target_capacity_direction,
deployment_time=deployment_time,
)

# At this point, `existing_apps` contains apps that were previously deployed
# but are no longer present in the config.
for app_name in existing_apps:
self.delete_app(app_name)

ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
self._application_states[name].deploy_config(
app_config,
target_capacity,
target_capacity_direction,
deployment_time=deployment_time,
)

def get_deployments(self, app_name: str) -> List[str]:
"""Return all deployment names by app name"""
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def deploy_apps(
because a single-app config was deployed after deploying a multi-app
config, or vice versa.
"""
ray.get(self._controller.deploy_config.remote(config))
ray.get(self._controller.apply_config.remote(config))

if _blocking:
timeout_s = 60
Expand Down
31 changes: 12 additions & 19 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def _recover_state_from_checkpoint(self):
logger.info(
"Recovered config from checkpoint.", extra={"log_to_stderr": False}
)
self.deploy_config(serve_config, deployment_time=deployment_time)
self.apply_config(serve_config, deployment_time=deployment_time)

def _read_config_checkpoint(
self,
Expand Down Expand Up @@ -726,11 +726,9 @@ def deploy_application(self, name: str, deployment_args_list: List[bytes]) -> No
else None,
}
)
self.application_state_manager.apply_deployment_args(
name, deployment_args_deserialized
)
self.application_state_manager.deploy_app(name, deployment_args_deserialized)

def deploy_config(
def apply_config(
self,
config: ServeDeploySchema,
deployment_time: float = 0.0,
Expand Down Expand Up @@ -779,14 +777,6 @@ def deploy_config(
app_config_dict = app_config.dict(exclude_unset=True)
new_config_checkpoint[app_config.name] = app_config_dict

self.application_state_manager.deploy_config(
app_config.name,
app_config,
deployment_time=deployment_time,
target_capacity=self._target_capacity,
target_capacity_direction=self._target_capacity_direction,
)

self.kv_store.put(
CONFIG_CHECKPOINT_KEY,
pickle.dumps(
Expand All @@ -799,12 +789,15 @@ def deploy_config(
),
)

# Delete live applications not listed in the config.
existing_applications = set(
self.application_state_manager._application_states.keys()
edoakes marked this conversation as resolved.
Show resolved Hide resolved
# Declaratively apply the new set of applications.
# This will delete any applications no longer in the config that were
# previously deployed via the REST API.
self.application_state_manager.apply_app_configs(
config.applications,
deployment_time=deployment_time,
target_capacity=self._target_capacity,
target_capacity_direction=self._target_capacity_direction,
)
new_applications = {app_config.name for app_config in config.applications}
self.delete_apps(existing_applications.difference(new_applications))

def get_deployment_info(self, name: str, app_name: str = "") -> bytes:
"""Get the current information about a deployment.
Expand Down Expand Up @@ -983,7 +976,7 @@ def delete_apps(self, names: Iterable[str]):
During deletion, the application status is DELETING
"""
for name in names:
self.application_state_manager.delete_application(name)
self.application_state_manager.delete_app(name)

def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo):
"""Record multiplexed model ids for a replica of deployment
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_deploy_app_custom_exception(serve_instance):
]
}

ray.get(controller.deploy_config.remote(config=ServeDeploySchema.parse_obj(config)))
ray.get(controller.apply_config.remote(config=ServeDeploySchema.parse_obj(config)))

def check_custom_exception() -> bool:
status = serve.status().applications["broken_app"]
Expand Down
Loading
Loading