diff --git a/ray-operator/controllers/ray/common/constant.go b/ray-operator/controllers/ray/common/constant.go index 96f90a7e15..a5d457988c 100644 --- a/ray-operator/controllers/ray/common/constant.go +++ b/ray-operator/controllers/ray/common/constant.go @@ -55,17 +55,21 @@ const ( // The default name for kuberay operator ComponentName = "kuberay-operator" + // The defaule RayService Identifier. + RayServiceCreatorLabelValue = "rayservice" + // Check node if ready by checking the path exists or not PodReadyFilepath = "POD_READY_FILEPATH" // Use as container env variable - NAMESPACE = "NAMESPACE" - CLUSTER_NAME = "CLUSTER_NAME" - RAY_IP = "RAY_IP" - RAY_PORT = "RAY_PORT" - RAY_ADDRESS = "RAY_ADDRESS" - REDIS_PASSWORD = "REDIS_PASSWORD" - RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace" + NAMESPACE = "NAMESPACE" + CLUSTER_NAME = "CLUSTER_NAME" + RAY_IP = "RAY_IP" + RAY_PORT = "RAY_PORT" + RAY_ADDRESS = "RAY_ADDRESS" + REDIS_PASSWORD = "REDIS_PASSWORD" + RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace" + RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO = "RAY_timeout_ms_task_wait_for_death_info" // Ray core default configurations DefaultRedisPassword = "5241590000000000" diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index a5d975ad4f..609b9c623f 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -225,7 +225,7 @@ func initReadinessProbeHandler(probe *v1.Probe, rayNodeType rayiov1alpha1.RayNod } // BuildPod a pod config -func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, enableRayAutoscaler *bool) (aPod v1.Pod) { +func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, enableRayAutoscaler *bool, creator string) (aPod v1.Pod) { pod := v1.Pod{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", @@ -283,7 +283,7 @@ func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayN setInitContainerEnvVars(&pod.Spec.InitContainers[index], svcName) } - setContainerEnvVars(&pod, rayContainerIndex, rayNodeType, rayStartParams, svcName, headPort) + setContainerEnvVars(&pod, rayContainerIndex, rayNodeType, rayStartParams, svcName, headPort, creator) // health check only if HA enabled if podTemplateSpec.Annotations != nil { @@ -492,7 +492,7 @@ func setInitContainerEnvVars(container *v1.Container, svcName string) { } } -func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string) { +func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, creator string) { // set IP to local host if head, or the the svc otherwise RAY_IP // set the port RAY_PORT // set the password? @@ -518,6 +518,13 @@ func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1 portEnv := v1.EnvVar{Name: RAY_PORT, Value: headPort} container.Env = append(container.Env, portEnv) } + if strings.ToLower(creator) == RayServiceCreatorLabelValue { + // Only add this env for Ray Service cluster to improve service SLA. + if !envVarExists(RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO, container.Env) { + deathEnv := v1.EnvVar{Name: RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO, Value: "0"} + container.Env = append(container.Env, deathEnv) + } + } // Setting the RAY_ADDRESS env allows connecting to Ray using ray.init() when connecting // from within the cluster. if !envVarExists(RAY_ADDRESS, container.Env) { diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index 606c3c165a..e8e9724156 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -270,7 +270,7 @@ func TestBuildPod(t *testing.T) { podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0)) svcName := utils.GenerateServiceName(cluster.Name) podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379") - pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", nil) + pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", nil, "") // Check RAY_ADDRESS env. checkRayAddress(t, pod, "127.0.0.1:6379") @@ -310,7 +310,7 @@ func TestBuildPod(t *testing.T) { worker := cluster.Spec.WorkerGroupSpecs[0] podName = cluster.Name + DashSymbol + string(rayiov1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0) podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, svcName, "6379") - pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, "6379", nil) + pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, "6379", nil, "") // Check RAY_ADDRESS env checkRayAddress(t, pod, "raycluster-sample-head-svc:6379") @@ -335,7 +335,7 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) { podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0)) svcName := utils.GenerateServiceName(cluster.Name) podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379") - pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag) + pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, "") actualResult := pod.Labels[RayClusterLabelKey] expectedResult := cluster.Name @@ -385,6 +385,33 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) { } } +func TestBuildPod_WithCreatedByRayService(t *testing.T) { + cluster := instance.DeepCopy() + cluster.Spec.EnableInTreeAutoscaling = &trueFlag + podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0)) + svcName := utils.GenerateServiceName(cluster.Name) + podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379") + pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, RayServiceCreatorLabelValue) + + hasCorrectDeathEnv := false + for _, container := range pod.Spec.Containers { + if container.Name != "ray-head" { + continue + } + if container.Env == nil || len(container.Env) == 0 { + t.Fatalf("Expected death env `%v`", container) + } + for _, env := range container.Env { + if env.Name == RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO { + assert.Equal(t, "0", env.Value) + hasCorrectDeathEnv = true + break + } + } + } + assert.True(t, hasCorrectDeathEnv) +} + // Check that autoscaler container overrides work as expected. func TestBuildPodWithAutoscalerOptions(t *testing.T) { cluster := instance.DeepCopy() @@ -419,7 +446,7 @@ func TestBuildPodWithAutoscalerOptions(t *testing.T) { EnvFrom: customEnvFrom, } podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379") - pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag) + pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, "") expectedContainer := *autoscalerContainer.DeepCopy() expectedContainer.Image = customAutoscalerImage expectedContainer.ImagePullPolicy = customPullPolicy diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index cccf25d428..013f6e47ec 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -689,7 +689,8 @@ func (r *RayClusterReconciler) buildHeadPod(instance rayiov1alpha1.RayCluster) c headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams) autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling podConf := common.DefaultHeadPodTemplate(instance, instance.Spec.HeadGroupSpec, podName, svcName, headPort) - pod := common.BuildPod(podConf, rayiov1alpha1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, svcName, headPort, autoscalingEnabled) + creatorName := getCreator(instance) + pod := common.BuildPod(podConf, rayiov1alpha1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, svcName, headPort, autoscalingEnabled, creatorName) // Set raycluster instance as the owner and controller if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil { r.Log.Error(err, "Failed to set controller reference for raycluster pod") @@ -698,6 +699,19 @@ func (r *RayClusterReconciler) buildHeadPod(instance rayiov1alpha1.RayCluster) c return pod } +func getCreator(instance rayiov1alpha1.RayCluster) string { + if instance.Labels == nil { + return "" + } + creatorName, exist := instance.Labels[common.KubernetesCreatedByLabelKey] + + if !exist { + return "" + } + + return creatorName +} + // Build worker instance pods. func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster, worker rayiov1alpha1.WorkerGroupSpec) corev1.Pod { podName := strings.ToLower(instance.Name + common.DashSymbol + string(rayiov1alpha1.WorkerNode) + common.DashSymbol + worker.GroupName + common.DashSymbol) @@ -707,7 +721,8 @@ func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster, headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams) autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling podTemplateSpec := common.DefaultWorkerPodTemplate(instance, worker, podName, svcName, headPort) - pod := common.BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, headPort, autoscalingEnabled) + creatorName := getCreator(instance) + pod := common.BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, headPort, autoscalingEnabled, creatorName) // Set raycluster instance as the owner and controller if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil { r.Log.Error(err, "Failed to set controller reference for raycluster pod") diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index b7b168e981..5d6b914b4b 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -411,6 +411,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv rayClusterLabel[k] = v } rayClusterLabel[common.RayServiceLabelKey] = rayService.Name + rayClusterLabel[common.KubernetesCreatedByLabelKey] = common.RayServiceCreatorLabelValue rayClusterAnnotations := make(map[string]string) for k, v := range rayService.Annotations {