Skip to content

Commit

Permalink
[serve] Avoid deleting dynamically-deployed apps when applying config…
Browse files Browse the repository at this point in the history
… via REST API (#44476)

See #44226 for more details, but TL;DR:

- The REST API is declarative, meaning if you deploy a config that does not contain some running applications, they will be deleted.
- However, in some cases users mix-and-match this declarative API with the imperative serve.run API. In this case, we should not delete the dynamically-created apps.
- This PR addresses it by tracking the APIType and only deleting apps that were applied via the declarative API. There is no public API change. In the future we may want to consider exposing this metadata so external systems like Kuberay can treat the types of apps differently (e.g., for determining high-level status).

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes authored Apr 4, 2024
1 parent 028de8b commit f09d01e
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 85 deletions.
139 changes: 99 additions & 40 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ class BuildAppStatus(Enum):
FAILED = 4


class APIType(Enum):
"""Tracks the type of API that an application originates from."""

UNKNOWN = 0
IMPERATIVE = 1
DECLARATIVE = 2


@dataclass
class BuildAppTaskInfo:
"""Stores info on the current in-progress build app task.
Expand Down Expand Up @@ -107,6 +115,7 @@ class ApplicationTargetState:
target_capacity: Optional[float]
target_capacity_direction: Optional[TargetCapacityDirection]
deleting: bool
api_type: APIType


class ApplicationState:
Expand Down Expand Up @@ -153,6 +162,7 @@ def __init__(
target_capacity=None,
target_capacity_direction=None,
deleting=False,
api_type=APIType.UNKNOWN,
)
self._save_checkpoint_func = save_checkpoint_func

Expand Down Expand Up @@ -194,6 +204,10 @@ def target_deployments(self) -> List[str]:
def ingress_deployment(self) -> Optional[str]:
return self._ingress_deployment_name

@property
def api_type(self) -> APIType:
return self._target_state.api_type

def recover_target_state_from_checkpoint(
self, checkpoint_data: ApplicationTargetState
):
Expand All @@ -202,16 +216,19 @@ def recover_target_state_from_checkpoint(
)
self._set_target_state(
checkpoint_data.deployment_infos,
checkpoint_data.code_version,
checkpoint_data.config,
checkpoint_data.target_capacity,
checkpoint_data.target_capacity_direction,
checkpoint_data.deleting,
api_type=checkpoint_data.api_type,
code_version=checkpoint_data.code_version,
target_config=checkpoint_data.config,
target_capacity=checkpoint_data.target_capacity,
target_capacity_direction=checkpoint_data.target_capacity_direction,
deleting=checkpoint_data.deleting,
)

def _set_target_state(
self,
deployment_infos: Optional[Dict[str, DeploymentInfo]],
*,
api_type: APIType,
code_version: str,
target_config: Optional[ServeApplicationSchema],
target_capacity: Optional[float] = None,
Expand All @@ -227,7 +244,6 @@ def _set_target_state(
When a request to delete the application has been received, this should be
({}, True)
"""

if deleting:
self._update_status(ApplicationStatus.DELETING)
else:
Expand All @@ -247,6 +263,7 @@ def _set_target_state(
target_capacity,
target_capacity_direction,
deleting,
api_type=api_type,
)

# Checkpoint ahead, so that if the controller crashes before we
Expand All @@ -261,14 +278,30 @@ def _set_target_state_deleting(self):
Wipes the target deployment infos, code version, and config.
"""
self._set_target_state(dict(), None, None, None, None, True)
self._set_target_state(
deployment_infos=dict(),
api_type=self._target_state.api_type,
code_version=None,
target_config=None,
deleting=True,
)

def _clear_target_state_and_store_config(
self, target_config: Optional[ServeApplicationSchema]
self,
target_config: Optional[ServeApplicationSchema],
):
"""Clears the target state and stores the config."""
"""Clears the target state and stores the config.
self._set_target_state(None, None, target_config, None, None, False)
NOTE: this currently assumes that this method is *only* called when managing
apps deployed with the declarative API.
"""
self._set_target_state(
deployment_infos=None,
api_type=APIType.DECLARATIVE,
code_version=None,
target_config=target_config,
deleting=False,
)

def _delete_deployment(self, name):
id = DeploymentID(name=name, app_name=self._name)
Expand All @@ -279,7 +312,7 @@ def delete(self):
"""Delete the application"""
if self._status != ApplicationStatus.DELETING:
logger.info(
f"Deleting application '{self._name}'",
f"Deleting app '{self._name}'.",
extra={"log_to_stderr": False},
)
self._set_target_state_deleting()
Expand Down Expand Up @@ -327,11 +360,11 @@ def apply_deployment_info(
else:
self._endpoint_state.delete_endpoint(deployment_id)

def deploy(self, deployment_infos: Dict[str, DeploymentInfo]):
"""Deploy application from list of deployment infos.
def deploy_app(self, deployment_infos: Dict[str, DeploymentInfo]):
"""(Re-)deploy the application from list of deployment infos.
This function should only be called if the app is being deployed
through serve.run instead of from a config.
This function should only be called to deploy an app from an
imperative API (i.e., `serve.run` or Java API).
Raises: RayServeException if there is more than one route prefix
or docs path.
Expand All @@ -342,25 +375,29 @@ def deploy(self, deployment_infos: Dict[str, DeploymentInfo]):

self._set_target_state(
deployment_infos=deployment_infos,
api_type=APIType.IMPERATIVE,
code_version=None,
target_config=None,
target_capacity=None,
target_capacity_direction=None,
)

def deploy_config(
def apply_app_config(
self,
config: ServeApplicationSchema,
target_capacity: Optional[float],
target_capacity_direction: Optional[TargetCapacityDirection],
deployment_time: int,
) -> None:
"""Deploys an application config.
"""Apply the config to the application.
If the code version matches that of the current live deployments
then it only applies the updated config to the deployment state
manager. If the code version doesn't match, this will re-build
the application.
This function should only be called to (re-)deploy an app from
the declarative API (i.e., through the REST API).
"""

self._deployment_timestamp = deployment_time
Expand All @@ -377,6 +414,7 @@ def deploy_config(
self._set_target_state(
# Code version doesn't change.
code_version=self._target_state.code_version,
api_type=APIType.DECLARATIVE,
# Everything else must reflect the new config.
deployment_infos=overrided_infos,
target_config=config,
Expand Down Expand Up @@ -415,7 +453,7 @@ def deploy_config(
ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1")

# Kick off new build app task
logger.info(f"Building application '{self._name}'.")
logger.info(f"Importing and building app '{self._name}'.")
build_app_obj_ref = build_serve_application.options(
runtime_env=config.runtime_env,
enable_task_events=RAY_SERVE_ENABLE_TASK_EVENTS,
Expand Down Expand Up @@ -517,7 +555,7 @@ def _reconcile_build_app_task(self) -> Tuple[Tuple, BuildAppStatus, str]:
try:
args, err = ray.get(self._build_app_task_info.obj_ref)
if err is None:
logger.info(f"Built application '{self._name}' successfully.")
logger.info(f"Imported and built app '{self._name}' successfully.")
else:
return (
None,
Expand Down Expand Up @@ -658,6 +696,7 @@ def update(self) -> bool:
self._set_target_state(
deployment_infos=infos,
code_version=self._build_app_task_info.code_version,
api_type=self._target_state.api_type,
target_config=self._build_app_task_info.config,
target_capacity=self._build_app_task_info.target_capacity,
target_capacity_direction=(
Expand Down Expand Up @@ -762,14 +801,14 @@ def _recover_from_checkpoint(self):
app_state.recover_target_state_from_checkpoint(checkpoint_data)
self._application_states[app_name] = app_state

def delete_application(self, name: str) -> None:
def delete_app(self, name: str) -> None:
"""Delete application by name"""
if name not in self._application_states:
return
self._application_states[name].delete()

def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None:
"""Apply list of deployment arguments to application target state.
def deploy_app(self, name: str, deployment_args: List[Dict]) -> None:
"""Deploy the specified app to the list of deployment arguments.
This function should only be called if the app is being deployed
through serve.run instead of from a config.
Expand Down Expand Up @@ -816,32 +855,52 @@ def apply_deployment_args(self, name: str, deployment_args: List[Dict]) -> None:
)
for params in deployment_args
}
self._application_states[name].deploy(deployment_infos)
self._application_states[name].deploy_app(deployment_infos)

def deploy_config(
def apply_app_configs(
self,
name: str,
app_config: ServeApplicationSchema,
app_configs: List[ServeApplicationSchema],
*,
deployment_time: float = 0,
target_capacity: Optional[float] = None,
target_capacity_direction: Optional[TargetCapacityDirection] = None,
) -> None:
"""Deploy application from config."""
):
"""Declaratively apply the list of application configs.
if name not in self._application_states:
self._application_states[name] = ApplicationState(
name,
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
The applications will be reconciled to match the target state of the config.
Any applications previously deployed declaratively that are *not* present in
the list will be deleted.
"""
for app_config in app_configs:
if app_config.name not in self._application_states:
logger.info(f"Deploying new app '{app_config.name}'.")
self._application_states[app_config.name] = ApplicationState(
app_config.name,
self._deployment_state_manager,
endpoint_state=self._endpoint_state,
save_checkpoint_func=self._save_checkpoint_func,
)

self._application_states[app_config.name].apply_app_config(
app_config,
target_capacity,
target_capacity_direction,
deployment_time=deployment_time,
)

# Delete all apps that were previously deployed via the declarative API
# but are not in the config being applied.
existing_apps = {
name
for name, app_state in self._application_states.items()
if app_state.api_type == APIType.DECLARATIVE
}
apps_in_config = {app_config.name for app_config in app_configs}
for app_to_delete in existing_apps - apps_in_config:
self.delete_app(app_to_delete)

ServeUsageTag.NUM_APPS.record(str(len(self._application_states)))
self._application_states[name].deploy_config(
app_config,
target_capacity,
target_capacity_direction,
deployment_time=deployment_time,
)

def get_deployments(self, app_name: str) -> List[str]:
"""Return all deployment names by app name"""
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def deploy_apps(
because a single-app config was deployed after deploying a multi-app
config, or vice versa.
"""
ray.get(self._controller.deploy_config.remote(config))
ray.get(self._controller.apply_config.remote(config))

if _blocking:
timeout_s = 60
Expand Down
31 changes: 12 additions & 19 deletions python/ray/serve/_private/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ def _recover_state_from_checkpoint(self):
logger.info(
"Recovered config from checkpoint.", extra={"log_to_stderr": False}
)
self.deploy_config(serve_config, deployment_time=deployment_time)
self.apply_config(serve_config, deployment_time=deployment_time)

def _read_config_checkpoint(
self,
Expand Down Expand Up @@ -726,11 +726,9 @@ def deploy_application(self, name: str, deployment_args_list: List[bytes]) -> No
else None,
}
)
self.application_state_manager.apply_deployment_args(
name, deployment_args_deserialized
)
self.application_state_manager.deploy_app(name, deployment_args_deserialized)

def deploy_config(
def apply_config(
self,
config: ServeDeploySchema,
deployment_time: float = 0.0,
Expand Down Expand Up @@ -779,14 +777,6 @@ def deploy_config(
app_config_dict = app_config.dict(exclude_unset=True)
new_config_checkpoint[app_config.name] = app_config_dict

self.application_state_manager.deploy_config(
app_config.name,
app_config,
deployment_time=deployment_time,
target_capacity=self._target_capacity,
target_capacity_direction=self._target_capacity_direction,
)

self.kv_store.put(
CONFIG_CHECKPOINT_KEY,
pickle.dumps(
Expand All @@ -799,12 +789,15 @@ def deploy_config(
),
)

# Delete live applications not listed in the config.
existing_applications = set(
self.application_state_manager._application_states.keys()
# Declaratively apply the new set of applications.
# This will delete any applications no longer in the config that were
# previously deployed via the REST API.
self.application_state_manager.apply_app_configs(
config.applications,
deployment_time=deployment_time,
target_capacity=self._target_capacity,
target_capacity_direction=self._target_capacity_direction,
)
new_applications = {app_config.name for app_config in config.applications}
self.delete_apps(existing_applications.difference(new_applications))

def get_deployment_info(self, name: str, app_name: str = "") -> bytes:
"""Get the current information about a deployment.
Expand Down Expand Up @@ -983,7 +976,7 @@ def delete_apps(self, names: Iterable[str]):
During deletion, the application status is DELETING
"""
for name in names:
self.application_state_manager.delete_application(name)
self.application_state_manager.delete_app(name)

def record_multiplexed_replica_info(self, info: MultiplexedReplicaInfo):
"""Record multiplexed model ids for a replica of deployment
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_deploy_app_custom_exception(serve_instance):
]
}

ray.get(controller.deploy_config.remote(config=ServeDeploySchema.parse_obj(config)))
ray.get(controller.apply_config.remote(config=ServeDeploySchema.parse_obj(config)))

def check_custom_exception() -> bool:
status = serve.status().applications["broken_app"]
Expand Down
Loading

0 comments on commit f09d01e

Please sign in to comment.