Skip to content

Commit

Permalink
RayCluster created by RayService set death info env for ray container (
Browse files Browse the repository at this point in the history
…#419)

* RayCluster created by RayService set death info env for ray container

* update
  • Loading branch information
brucez-anyscale authored Jul 26, 2022
1 parent 0ff8ff5 commit ce84f04
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 16 deletions.
18 changes: 11 additions & 7 deletions ray-operator/controllers/ray/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,21 @@ const (
// The default name for kuberay operator
ComponentName = "kuberay-operator"

// The defaule RayService Identifier.
RayServiceCreatorLabelValue = "rayservice"

// Check node if ready by checking the path exists or not
PodReadyFilepath = "POD_READY_FILEPATH"

// Use as container env variable
NAMESPACE = "NAMESPACE"
CLUSTER_NAME = "CLUSTER_NAME"
RAY_IP = "RAY_IP"
RAY_PORT = "RAY_PORT"
RAY_ADDRESS = "RAY_ADDRESS"
REDIS_PASSWORD = "REDIS_PASSWORD"
RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace"
NAMESPACE = "NAMESPACE"
CLUSTER_NAME = "CLUSTER_NAME"
RAY_IP = "RAY_IP"
RAY_PORT = "RAY_PORT"
RAY_ADDRESS = "RAY_ADDRESS"
REDIS_PASSWORD = "REDIS_PASSWORD"
RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace"
RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO = "RAY_timeout_ms_task_wait_for_death_info"

// Ray core default configurations
DefaultRedisPassword = "5241590000000000"
Expand Down
13 changes: 10 additions & 3 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func initReadinessProbeHandler(probe *v1.Probe, rayNodeType rayiov1alpha1.RayNod
}

// BuildPod a pod config
func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, enableRayAutoscaler *bool) (aPod v1.Pod) {
func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, enableRayAutoscaler *bool, creator string) (aPod v1.Pod) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand Down Expand Up @@ -283,7 +283,7 @@ func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayN
setInitContainerEnvVars(&pod.Spec.InitContainers[index], svcName)
}

setContainerEnvVars(&pod, rayContainerIndex, rayNodeType, rayStartParams, svcName, headPort)
setContainerEnvVars(&pod, rayContainerIndex, rayNodeType, rayStartParams, svcName, headPort, creator)

// health check only if HA enabled
if podTemplateSpec.Annotations != nil {
Expand Down Expand Up @@ -492,7 +492,7 @@ func setInitContainerEnvVars(container *v1.Container, svcName string) {
}
}

func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string) {
func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, creator string) {
// set IP to local host if head, or the the svc otherwise RAY_IP
// set the port RAY_PORT
// set the password?
Expand All @@ -518,6 +518,13 @@ func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1
portEnv := v1.EnvVar{Name: RAY_PORT, Value: headPort}
container.Env = append(container.Env, portEnv)
}
if strings.ToLower(creator) == RayServiceCreatorLabelValue {
// Only add this env for Ray Service cluster to improve service SLA.
if !envVarExists(RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO, container.Env) {
deathEnv := v1.EnvVar{Name: RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO, Value: "0"}
container.Env = append(container.Env, deathEnv)
}
}
// Setting the RAY_ADDRESS env allows connecting to Ray using ray.init() when connecting
// from within the cluster.
if !envVarExists(RAY_ADDRESS, container.Env) {
Expand Down
35 changes: 31 additions & 4 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestBuildPod(t *testing.T) {
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
svcName := utils.GenerateServiceName(cluster.Name)
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", nil)
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", nil, "")

// Check RAY_ADDRESS env.
checkRayAddress(t, pod, "127.0.0.1:6379")
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestBuildPod(t *testing.T) {
worker := cluster.Spec.WorkerGroupSpecs[0]
podName = cluster.Name + DashSymbol + string(rayiov1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, svcName, "6379")
pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, "6379", nil)
pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, "6379", nil, "")

// Check RAY_ADDRESS env
checkRayAddress(t, pod, "raycluster-sample-head-svc:6379")
Expand All @@ -335,7 +335,7 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) {
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
svcName := utils.GenerateServiceName(cluster.Name)
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag)
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, "")

actualResult := pod.Labels[RayClusterLabelKey]
expectedResult := cluster.Name
Expand Down Expand Up @@ -385,6 +385,33 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) {
}
}

func TestBuildPod_WithCreatedByRayService(t *testing.T) {
cluster := instance.DeepCopy()
cluster.Spec.EnableInTreeAutoscaling = &trueFlag
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
svcName := utils.GenerateServiceName(cluster.Name)
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, RayServiceCreatorLabelValue)

hasCorrectDeathEnv := false
for _, container := range pod.Spec.Containers {
if container.Name != "ray-head" {
continue
}
if container.Env == nil || len(container.Env) == 0 {
t.Fatalf("Expected death env `%v`", container)
}
for _, env := range container.Env {
if env.Name == RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO {
assert.Equal(t, "0", env.Value)
hasCorrectDeathEnv = true
break
}
}
}
assert.True(t, hasCorrectDeathEnv)
}

// Check that autoscaler container overrides work as expected.
func TestBuildPodWithAutoscalerOptions(t *testing.T) {
cluster := instance.DeepCopy()
Expand Down Expand Up @@ -419,7 +446,7 @@ func TestBuildPodWithAutoscalerOptions(t *testing.T) {
EnvFrom: customEnvFrom,
}
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag)
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, "")
expectedContainer := *autoscalerContainer.DeepCopy()
expectedContainer.Image = customAutoscalerImage
expectedContainer.ImagePullPolicy = customPullPolicy
Expand Down
19 changes: 17 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ func (r *RayClusterReconciler) buildHeadPod(instance rayiov1alpha1.RayCluster) c
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling
podConf := common.DefaultHeadPodTemplate(instance, instance.Spec.HeadGroupSpec, podName, svcName, headPort)
pod := common.BuildPod(podConf, rayiov1alpha1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, svcName, headPort, autoscalingEnabled)
creatorName := getCreator(instance)
pod := common.BuildPod(podConf, rayiov1alpha1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, svcName, headPort, autoscalingEnabled, creatorName)
// Set raycluster instance as the owner and controller
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
r.Log.Error(err, "Failed to set controller reference for raycluster pod")
Expand All @@ -698,6 +699,19 @@ func (r *RayClusterReconciler) buildHeadPod(instance rayiov1alpha1.RayCluster) c
return pod
}

func getCreator(instance rayiov1alpha1.RayCluster) string {
if instance.Labels == nil {
return ""
}
creatorName, exist := instance.Labels[common.KubernetesCreatedByLabelKey]

if !exist {
return ""
}

return creatorName
}

// Build worker instance pods.
func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster, worker rayiov1alpha1.WorkerGroupSpec) corev1.Pod {
podName := strings.ToLower(instance.Name + common.DashSymbol + string(rayiov1alpha1.WorkerNode) + common.DashSymbol + worker.GroupName + common.DashSymbol)
Expand All @@ -707,7 +721,8 @@ func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster,
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling
podTemplateSpec := common.DefaultWorkerPodTemplate(instance, worker, podName, svcName, headPort)
pod := common.BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, headPort, autoscalingEnabled)
creatorName := getCreator(instance)
pod := common.BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, headPort, autoscalingEnabled, creatorName)
// Set raycluster instance as the owner and controller
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
r.Log.Error(err, "Failed to set controller reference for raycluster pod")
Expand Down
1 change: 1 addition & 0 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv
rayClusterLabel[k] = v
}
rayClusterLabel[common.RayServiceLabelKey] = rayService.Name
rayClusterLabel[common.KubernetesCreatedByLabelKey] = common.RayServiceCreatorLabelValue

rayClusterAnnotations := make(map[string]string)
for k, v := range rayService.Annotations {
Expand Down

0 comments on commit ce84f04

Please sign in to comment.