Skip to content

Commit

Permalink
[serve][deploy refactor][2/X] Move lightweight update logic to Deploy…
Browse files Browse the repository at this point in the history
…mentVersion (ray-project#34430)

`DeploymentVersion` already has existing support for code version + other config options that affect the version (before, only user config was taken into account). We can leverage this to do lightweight config updates, so that:
- The config's `import_path` and `runtime_env` tells us the code version for the deployments
- The logic for lightweight config updates is hidden in `DeploymentVersion`

So instead of having `DeploymentVersion` only rely on user config, we want it to rely on other info from `DeploymentInfo`, so we can decide which config options should trigger a redeployment and which shouldn't. I've added annotations `DeploymentOptionUpdateType` to fields in `DeploymentConfig` to indicate which options fall into which category as described below.

* Heavyweight options that will force replicas to restart: `version` and `ray_actor_options` (which is part of replica config)
* Lightweight options that need to call reconfigure on the replica actor: `user_config` and `graceful_shutdown_wait_loop_s`
* Lightweight options (that need to update replicas in replica state container, but won't need to call into the actual actor): `max_concurrent_queries`, `graceful_shutdown_timeout_s`, `health_check_period_s`, `health_check_timeout_s`
* Purely lightweight options: `num_replicas` and `autoscaling_config`
<img width="799" alt="Screen Shot 2023-04-20 at 6 50 13 PM" src="https://user-images.githubusercontent.com/15851518/233521791-537bf512-3f7a-4add-8a9a-701fde8ebeb1.png">
<img width="800" alt="Screen Shot 2023-04-20 at 6 50 31 PM" src="https://user-images.githubusercontent.com/15851518/233521825-b3a74e76-d863-4715-b66f-fa128319a461.png">

This fixes the issue of deployments not listed in config being redeployed. See the newly added test `test_deployments_not_listed_in_config`.
  • Loading branch information
zcin authored and architkulkarni committed May 16, 2023
1 parent ab0d028 commit 247a38f
Show file tree
Hide file tree
Showing 19 changed files with 1,028 additions and 584 deletions.
20 changes: 20 additions & 0 deletions java/serve/src/main/java/io/ray/serve/config/DeploymentConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,26 @@ public byte[] toProtoBytes() {
return builder.build().toByteArray();
}

public io.ray.serve.generated.DeploymentConfig toProto() {
io.ray.serve.generated.DeploymentConfig.Builder builder =
io.ray.serve.generated.DeploymentConfig.newBuilder()
.setNumReplicas(numReplicas)
.setMaxConcurrentQueries(maxConcurrentQueries)
.setGracefulShutdownWaitLoopS(gracefulShutdownWaitLoopS)
.setGracefulShutdownTimeoutS(gracefulShutdownTimeoutS)
.setHealthCheckPeriodS(healthCheckPeriodS)
.setHealthCheckTimeoutS(healthCheckTimeoutS)
.setIsCrossLanguage(isCrossLanguage)
.setDeploymentLanguage(deploymentLanguage);
if (null != userConfig) {
builder.setUserConfig(ByteString.copyFrom(MessagePackSerializer.encode(userConfig).getKey()));
}
if (null != autoscalingConfig) {
builder.setAutoscalingConfig(autoscalingConfig.toProto());
}
return builder.build();
}

public static DeploymentConfig fromProto(io.ray.serve.generated.DeploymentConfig proto) {

DeploymentConfig deploymentConfig = new DeploymentConfig();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,52 @@
package io.ray.serve.deployment;

import com.google.protobuf.ByteString;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import io.ray.runtime.serializer.MessagePackSerializer;
import io.ray.serve.config.DeploymentConfig;
import io.ray.serve.exception.RayServeException;
import java.io.Serializable;
import java.util.Map;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;

public class DeploymentVersion implements Serializable {

private static Gson gson = new Gson();

private static final long serialVersionUID = 3400261981775851058L;

private String codeVersion;

private Object userConfig;

private DeploymentConfig deploymentConfig;

private Map<String, Object> rayActorOptions;

private boolean unversioned;

public DeploymentVersion() {
this(null, null);
this(null, new DeploymentConfig(), null);
}

public DeploymentVersion(String codeVersion) {
this(codeVersion, null);
this(codeVersion, new DeploymentConfig(), null);
}

public DeploymentVersion(String codeVersion, Object userConfig) {
public DeploymentVersion(
String codeVersion, DeploymentConfig deploymentConfig, Map<String, Object> rayActorOptions) {
if (StringUtils.isBlank(codeVersion)) {
this.unversioned = true;
this.codeVersion = RandomStringUtils.randomAlphabetic(6);
} else {
this.codeVersion = codeVersion;
}
this.userConfig = userConfig;
if (deploymentConfig == null) {
deploymentConfig = new DeploymentConfig();
}
this.deploymentConfig = deploymentConfig;
this.rayActorOptions = rayActorOptions;
this.userConfig = deploymentConfig.getUserConfig();
}

public String getCodeVersion() {
Expand All @@ -44,6 +57,14 @@ public Object getUserConfig() {
return userConfig;
}

public DeploymentConfig getDeploymentConfig() {
return deploymentConfig;
}

public Map<String, Object> getRayActorOptions() {
return rayActorOptions;
}

public boolean isUnversioned() {
return unversioned;
}
Expand All @@ -64,12 +85,8 @@ public static DeploymentVersion fromProtoBytes(byte[] bytes) {
}
return new DeploymentVersion(
proto.getCodeVersion(),
proto.getUserConfig() != null && proto.getUserConfig().size() != 0
? new Object[] {
MessagePackSerializer.decode(
proto.getUserConfig().toByteArray(), Object.class) // TODO-xlang
}
: null);
DeploymentConfig.fromProto(proto.getDeploymentConfig()),
gson.fromJson(proto.getRayActorOptions(), Map.class));
}

public byte[] toProtoBytes() {
Expand All @@ -79,9 +96,9 @@ public byte[] toProtoBytes() {
if (StringUtils.isNotBlank(codeVersion)) {
proto.setCodeVersion(codeVersion);
}
if (userConfig != null) {
proto.setUserConfig(
ByteString.copyFrom(MessagePackSerializer.encode(userConfig).getLeft())); // TODO-xlang
proto.setDeploymentConfig(deploymentConfig.toProto());
if (rayActorOptions != null && !rayActorOptions.isEmpty()) {
proto.setRayActorOptions(gson.toJson(rayActorOptions));
}
return proto.build().toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.ray.serve.replica;

import io.ray.serve.deployment.DeploymentVersion;

public interface RayServeReplica {

Object handleRequest(Object requestMetadata, Object requestArgs);

default Object reconfigure(Object userConfig) {
return new DeploymentVersion(null, userConfig);
default Object reconfigure(byte[] deploymentConfigBytes) {
return null;
}

default boolean checkHealth() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ public synchronized boolean prepareForShutdown() {
}

@Override
public DeploymentVersion reconfigure(Object userConfig) {
public DeploymentVersion reconfigure(byte[] deploymentConfigBytes) {
config = DeploymentConfig.fromProtoBytes(deploymentConfigBytes);
Object userConfig = config.getUserConfig();
DeploymentVersion deploymentVersion =
new DeploymentVersion(version.getCodeVersion(), userConfig);
new DeploymentVersion(version.getCodeVersion(), config, version.getRayActorOptions());
version = deploymentVersion;
if (userConfig == null) {
return deploymentVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ public boolean isAllocated() {
*
* @return
*/
public Object isInitialized(Object userConfig) {
Object deploymentVersion = reconfigure(userConfig);
public Object isInitialized(byte[] deploymentConfigBytes) {
Object deploymentVersion = reconfigure(deploymentConfigBytes);
checkHealth();
return deploymentVersion;
}
Expand All @@ -213,13 +213,8 @@ public boolean prepareForShutdown() {
* DeploymentVersion is serialized to protobuf byte[].
*/
@Override
public Object reconfigure(Object userConfig) {
boolean isCrossLanguage = userConfig instanceof byte[];
DeploymentVersion deploymentVersion =
replica.reconfigure(
isCrossLanguage && userConfig != null
? MessagePackSerializer.decode((byte[]) userConfig, Object.class)
: userConfig);
public Object reconfigure(byte[] deploymentConfigBytes) {
DeploymentVersion deploymentVersion = replica.reconfigure(deploymentConfigBytes);
return deploymentVersion.toProtoBytes();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,17 @@ public void test() throws IOException {

// reconfigure
ObjectRef<Object> versionRef =
replicHandle.task(RayServeWrappedReplica::reconfigure, (Object) null).remote();
replicHandle
.task(RayServeWrappedReplica::reconfigure, (new DeploymentConfig()).toProtoBytes())
.remote();
Assert.assertEquals(
DeploymentVersion.fromProtoBytes((byte[]) (versionRef.get())).getCodeVersion(), version);

replicHandle.task(RayServeWrappedReplica::reconfigure, new Object()).remote().get();
deploymentConfig = deploymentConfig.setUserConfig(new Object());
replicHandle
.task(RayServeWrappedReplica::reconfigure, deploymentConfig.toProtoBytes())
.remote()
.get();
resultRef =
replicHandle
.task(
Expand All @@ -84,8 +90,9 @@ public void test() throws IOException {
.remote();
Assert.assertEquals((String) resultRef.get(), "1");

deploymentConfig = deploymentConfig.setUserConfig(ImmutableMap.of("value", "100"));
replicHandle
.task(RayServeWrappedReplica::reconfigure, ImmutableMap.of("value", "100"))
.task(RayServeWrappedReplica::reconfigure, deploymentConfig.toProtoBytes())
.remote()
.get();
resultRef =
Expand Down
22 changes: 22 additions & 0 deletions python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from typing import Dict, Tuple, Union, Callable, Type, Optional, Any
import hashlib
import json
import logging
import time

from ray.serve.config import ReplicaConfig, DeploymentConfig
from ray.serve.schema import ServeApplicationSchema
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve._private.common import DeploymentInfo
Expand Down Expand Up @@ -135,3 +138,22 @@ def deploy_args_to_deployment_info(
autoscaling_policy=autoscaling_policy,
is_driver_deployment=is_driver_deployment,
)


def get_app_code_version(app_config: ServeApplicationSchema) -> str:
"""Returns the code version of an application.
Args:
app_config: The application config.
Returns: a hash of the import path and (application level) runtime env representing
the code version of the application.
"""
encoded = json.dumps(
{
"import_path": app_config.import_path,
"runtime_env": app_config.runtime_env,
},
sort_keys=True,
).encode("utf-8")
return hashlib.md5(encoded).hexdigest()
Loading

0 comments on commit 247a38f

Please sign in to comment.