Skip to content

Commit

Permalink
Add integration test for kuberay ray service and improve ray service …
Browse files Browse the repository at this point in the history
…operator (ray-project#415)

* Fix slow start issue for ray serve

* update

* update

* update

* update

* update

* update

* Add virtual start cpus

* rm kustomization.yaml

* update

* update

* update

* update
  • Loading branch information
brucez-anyscale authored Jul 26, 2022
1 parent a802a50 commit 35f7839
Show file tree
Hide file tree
Showing 7 changed files with 667 additions and 17 deletions.
2 changes: 1 addition & 1 deletion ray-operator/apis/ray/v1alpha1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ServiceStatus string
const (
FailedToGetOrCreateRayCluster ServiceStatus = "FailedToGetOrCreateRayCluster"
WaitForDashboard ServiceStatus = "WaitForDashboard"
FailedServeDeploy ServiceStatus = "FailedServeDeploy"
WaitForServeDeploymentReady ServiceStatus = "WaitForServeDeploymentReady"
FailedToGetServeDeploymentStatus ServiceStatus = "FailedToGetServeDeploymentStatus"
Running ServiceStatus = "Running"
Restarting ServiceStatus = "Restarting"
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/config/samples/ray_v1alpha1_rayservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ kind: RayService
metadata:
name: rayservice-sample
spec:
serviceUnhealthySecondThreshold: 300 # Config for the health check threshold for service. Default value is 60.
deploymentUnhealthySecondThreshold: 300 # Config for the health check threshold for deployments. Default value is 60.
serveDeploymentGraphConfig:
importPath: fruit.deployment_graph
runtimeEnv: |
Expand Down
35 changes: 19 additions & 16 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
// Update RayService Status since reconcileRayCluster may mark RayCluster restart.
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update status of RayService after RayCluster changes", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}
logger.Info("Done reconcileRayCluster update status, enter next loop to create new ray cluster.")
return ctrl.Result{}, nil
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
}

logger.Info("Reconciling the Serve component.")
Expand All @@ -132,20 +132,23 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if activeRayClusterInstance != nil && pendingRayClusterInstance == nil {
rayServiceInstance.Status.PendingServiceStatus = rayv1alpha1.RayServiceStatus{}
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, activeRayClusterInstance, true, logger); err != nil {
return ctrlResult, err
logger.Error(err, "Fail to reconcileServe.")
return ctrlResult, nil
}
} else if activeRayClusterInstance != nil && pendingRayClusterInstance != nil {
if err = r.updateStatusForActiveCluster(ctx, rayServiceInstance, activeRayClusterInstance, logger); err != nil {
logger.Error(err, "The updating of the status for active ray cluster while we have pending cluster failed")
}

if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil {
return ctrlResult, err
logger.Error(err, "Fail to reconcileServe.")
return ctrlResult, nil
}
} else if activeRayClusterInstance == nil && pendingRayClusterInstance != nil {
rayServiceInstance.Status.ActiveServiceStatus = rayv1alpha1.RayServiceStatus{}
if ctrlResult, isHealthy, err = r.reconcileServe(ctx, rayServiceInstance, pendingRayClusterInstance, false, logger); err != nil {
return ctrlResult, err
logger.Error(err, "Fail to reconcileServe.")
return ctrlResult, nil
}
} else {
rayServiceInstance.Status.ActiveServiceStatus = rayv1alpha1.RayServiceStatus{}
Expand All @@ -168,26 +171,26 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
if rayClusterInstance != nil {
if err := r.reconcileIngress(ctx, rayServiceInstance, rayClusterInstance); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateIngress, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, common.HeadService); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateService, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.labelHealthyServePods(ctx, rayClusterInstance); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateServingPodLabel, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
if err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, common.ServingService); err != nil {
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToUpdateService, err)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}
}

// Final status update for any CR modification.
if errStatus := r.Status().Update(ctx, rayServiceInstance); errStatus != nil {
logger.Error(errStatus, "Fail to update status of RayService", "rayServiceInstance", rayServiceInstance)
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err
}

return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil
Expand Down Expand Up @@ -735,7 +738,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.WaitForDashboard, err)
return ctrl.Result{}, false, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

rayDashboardClient := utils.GetRayDashboardClientFunc()
Expand All @@ -749,8 +752,8 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
logger.Info("Dashboard is unhealthy, restart the cluster.")
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedServeDeploy, err)
return ctrl.Result{}, false, err
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.WaitForServeDeploymentReady, err)
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

r.Recorder.Eventf(rayServiceInstance, "Normal", "SubmittedServeDeployment",
Expand All @@ -764,7 +767,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
r.markRestart(rayServiceInstance)
}
err = r.updateState(ctx, rayServiceInstance, rayv1alpha1.FailedToGetServeDeploymentStatus, err)
return ctrl.Result{}, false, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

r.updateAndCheckDashboardStatus(rayServiceStatus, true, rayServiceInstance.Spec.DeploymentUnhealthySecondThreshold)
Expand All @@ -782,7 +785,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
r.markRestart(rayServiceInstance)
rayServiceInstance.Status.ServiceStatus = rayv1alpha1.Restarting
if err := r.Status().Update(ctx, rayServiceInstance); err != nil {
return ctrl.Result{}, false, err
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, false, err
}

logger.Info("Mark cluster as unhealthy", "rayCluster", rayClusterInstance)
Expand All @@ -791,7 +794,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
return ctrl.Result{RequeueAfter: ServiceRestartRequeueDuration}, false, nil
}

return ctrl.Result{}, true, nil
return ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, true, nil
}

func (r *RayServiceReconciler) labelHealthyServePods(ctx context.Context, rayClusterInstance *rayv1alpha1.RayCluster) error {
Expand Down
120 changes: 120 additions & 0 deletions tests/compatibility-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import tempfile
import unittest
import subprocess
from string import Template

import docker
Expand Down Expand Up @@ -101,6 +102,37 @@ def create_kuberay_cluster(template_name):
shell_assert_success('kubectl apply -f {}'.format(raycluster_service_file))


def create_kuberay_service(template_name):
template = None
with open(template_name, mode='r') as f:
template = Template(f.read())

rayservice_spec_buf = template.substitute(
{'ray_image': ray_image, 'ray_version': ray_version})

service_config_file = None
with tempfile.NamedTemporaryFile('w', delete=False) as f:
f.write(rayservice_spec_buf)
service_config_file = f.name

rtn = shell_run('kubectl wait --for=condition=ready pod -n ray-system --all --timeout=900s')
if rtn != 0:
shell_run('kubectl get pods -A')
assert rtn == 0
assert service_config_file is not None
shell_assert_success('kubectl apply -f {}'.format(service_config_file))

shell_run('kubectl get pods -A')

time.sleep(20)

wait_for_condition(
lambda: shell_run('kubectl get service rayservice-sample-serve-svc -o jsonpath="{.status}"') == 0,
timeout=900,
retry_interval_ms=5000,
)


def delete_cluster():
shell_run('kind delete cluster')

Expand Down Expand Up @@ -210,6 +242,14 @@ def ray_ha_supported():
return False
return True

def ray_service_supported():
if ray_version == "nightly":
return True
major, minor, patch = parse_ray_version(ray_version)
if major * 100 + minor < 113:
return False
return True


class RayHATestCase(unittest.TestCase):
cluster_template_file = 'tests/config/ray-cluster.ray-ha.yaml.template'
Expand Down Expand Up @@ -438,6 +478,58 @@ def get_new_value():
client.close()


class RayServiceTestCase(unittest.TestCase):
service_template_file = 'tests/config/ray-service.yaml.template'
service_serve_update_template_file = 'tests/config/ray-service-serve-update.yaml.template'
service_cluster_update_template_file = 'tests/config/ray-service-cluster-update.yaml.template'

@classmethod
def setUpClass(cls):
if not ray_service_supported():
return
# Ray Service is running inside a local Kind environment.
# We use the Ray nightly version now.
# We wait for the serve service ready.
# The test will check the successful response from serve service.
delete_cluster()
create_cluster()
apply_kuberay_resources()
download_images()
create_kuberay_service(RayServiceTestCase.service_template_file)

def setUp(self):
if not ray_service_supported():
raise unittest.SkipTest("ray service is not supported")

def test_ray_serve_work(self):
port_forwarding_proc = subprocess.Popen('kubectl port-forward service/rayservice-sample-serve-svc 8000', shell=True)
time.sleep(5)
curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\''
wait_for_condition(
lambda: shell_run(curl_cmd) == 0,
timeout=5,
)
create_kuberay_service(RayServiceTestCase.service_serve_update_template_file)
curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\''
time.sleep(5)
wait_for_condition(
lambda: shell_run(curl_cmd) == 0,
timeout=60,
)
create_kuberay_service(RayServiceTestCase.service_cluster_update_template_file)
time.sleep(5)
port_forwarding_proc.kill()
time.sleep(5)
port_forwarding_proc = subprocess.Popen('kubectl port-forward service/rayservice-sample-serve-svc 8000', shell=True)
time.sleep(5)
curl_cmd = 'curl -X POST -H \'Content-Type: application/json\' localhost:8000 -d \'["MANGO", 2]\''

wait_for_condition(
lambda: shell_run(curl_cmd) == 0,
timeout=180,
)
port_forwarding_proc.kill()

def parse_environment():
global ray_version, ray_image, kuberay_sha
for k, v in os.environ.items():
Expand All @@ -450,6 +542,34 @@ def parse_environment():
kuberay_sha = v


def wait_for_condition(
condition_predictor, timeout=10, retry_interval_ms=100, **kwargs
):
"""Wait until a condition is met or time out with an exception.
Args:
condition_predictor: A function that predicts the condition.
timeout: Maximum timeout in seconds.
retry_interval_ms: Retry interval in milliseconds.
Raises:
RuntimeError: If the condition is not met before the timeout expires.
"""
start = time.time()
last_ex = None
while time.time() - start <= timeout:
try:
if condition_predictor(**kwargs):
return
except Exception as ex:
last_ex = ex
time.sleep(retry_interval_ms / 1000.0)
message = "The condition wasn't met before the timeout expired."
if last_ex is not None:
message += f" Last exception: {last_ex}"
raise RuntimeError(message)


if __name__ == '__main__':
parse_environment()
unittest.main(verbosity=2)
Loading

0 comments on commit 35f7839

Please sign in to comment.