Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RayCluster created by RayService set death info env for ray container #419

Merged
merged 3 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions ray-operator/controllers/ray/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,21 @@ const (
// The default name for kuberay operator
ComponentName = "kuberay-operator"

// The defaule RayService Identifier.
RayServiceCreatorLabelValue = "rayservice"

// Check node if ready by checking the path exists or not
PodReadyFilepath = "POD_READY_FILEPATH"

// Use as container env variable
NAMESPACE = "NAMESPACE"
CLUSTER_NAME = "CLUSTER_NAME"
RAY_IP = "RAY_IP"
RAY_PORT = "RAY_PORT"
RAY_ADDRESS = "RAY_ADDRESS"
REDIS_PASSWORD = "REDIS_PASSWORD"
RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace"
NAMESPACE = "NAMESPACE"
CLUSTER_NAME = "CLUSTER_NAME"
RAY_IP = "RAY_IP"
RAY_PORT = "RAY_PORT"
RAY_ADDRESS = "RAY_ADDRESS"
REDIS_PASSWORD = "REDIS_PASSWORD"
RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace"
RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO = "RAY_timeout_ms_task_wait_for_death_info"

// Ray core default configurations
DefaultRedisPassword = "5241590000000000"
Expand Down
13 changes: 10 additions & 3 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func initReadinessProbeHandler(probe *v1.Probe, rayNodeType rayiov1alpha1.RayNod
}

// BuildPod a pod config
func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, enableRayAutoscaler *bool) (aPod v1.Pod) {
func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, enableRayAutoscaler *bool, creator string) (aPod v1.Pod) {
pod := v1.Pod{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand Down Expand Up @@ -283,7 +283,7 @@ func BuildPod(podTemplateSpec v1.PodTemplateSpec, rayNodeType rayiov1alpha1.RayN
setInitContainerEnvVars(&pod.Spec.InitContainers[index], svcName)
}

setContainerEnvVars(&pod, rayContainerIndex, rayNodeType, rayStartParams, svcName, headPort)
setContainerEnvVars(&pod, rayContainerIndex, rayNodeType, rayStartParams, svcName, headPort, creator)

// health check only if HA enabled
if podTemplateSpec.Annotations != nil {
Expand Down Expand Up @@ -487,7 +487,7 @@ func setInitContainerEnvVars(container *v1.Container, svcName string) {
}
}

func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string) {
func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1alpha1.RayNodeType, rayStartParams map[string]string, svcName string, headPort string, creator string) {
// set IP to local host if head, or the the svc otherwise RAY_IP
// set the port RAY_PORT
// set the password?
Expand All @@ -513,6 +513,13 @@ func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1
portEnv := v1.EnvVar{Name: RAY_PORT, Value: headPort}
container.Env = append(container.Env, portEnv)
}
if strings.ToLower(creator) == RayServiceCreatorLabelValue {
// Only add this env for Ray Service cluster to improve service SLA.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we only need this in Ray Service? what about other scenarios?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iycheng mentioned this will increase SLA for ray service.
But for ray job, it will ignore the error info which is not good for observability. So for now, we keep it only for RayService.

if !envVarExists(RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO, container.Env) {
deathEnv := v1.EnvVar{Name: RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO, Value: "0"}
container.Env = append(container.Env, deathEnv)
}
}
// Setting the RAY_ADDRESS env allows connecting to Ray using ray.init() when connecting
// from within the cluster.
if !envVarExists(RAY_ADDRESS, container.Env) {
Expand Down
35 changes: 31 additions & 4 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestBuildPod(t *testing.T) {
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
svcName := utils.GenerateServiceName(cluster.Name)
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", nil)
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", nil, "")

// Check RAY_ADDRESS env.
checkRayAddress(t, pod, "127.0.0.1:6379")
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestBuildPod(t *testing.T) {
worker := cluster.Spec.WorkerGroupSpecs[0]
podName = cluster.Name + DashSymbol + string(rayiov1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, svcName, "6379")
pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, "6379", nil)
pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, "6379", nil, "")

// Check RAY_ADDRESS env
checkRayAddress(t, pod, "raycluster-sample-head-svc:6379")
Expand All @@ -335,7 +335,7 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) {
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
svcName := utils.GenerateServiceName(cluster.Name)
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag)
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, "")

actualResult := pod.Labels[RayClusterLabelKey]
expectedResult := cluster.Name
Expand Down Expand Up @@ -385,6 +385,33 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) {
}
}

func TestBuildPod_WithCreatedByRayService(t *testing.T) {
cluster := instance.DeepCopy()
cluster.Spec.EnableInTreeAutoscaling = &trueFlag
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
svcName := utils.GenerateServiceName(cluster.Name)
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, RayServiceCreatorLabelValue)

hasCorrectDeathEnv := false
for _, container := range pod.Spec.Containers {
if container.Name != "ray-head" {
continue
}
if container.Env == nil || len(container.Env) == 0 {
t.Fatalf("Expected death env `%v`", container)
}
for _, env := range container.Env {
if env.Name == RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO {
assert.Equal(t, "0", env.Value)
hasCorrectDeathEnv = true
break
}
}
}
assert.True(t, hasCorrectDeathEnv)
}

// Check that autoscaler container overrides work as expected.
func TestBuildPodWithAutoscalerOptions(t *testing.T) {
cluster := instance.DeepCopy()
Expand Down Expand Up @@ -415,7 +442,7 @@ func TestBuildPodWithAutoscalerOptions(t *testing.T) {
Resources: &customResources,
}
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, svcName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag)
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, svcName, "6379", &trueFlag, "")
expectedContainer := *autoscalerContainer.DeepCopy()
expectedContainer.Image = customAutoscalerImage
expectedContainer.ImagePullPolicy = customPullPolicy
Expand Down
19 changes: 17 additions & 2 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,8 @@ func (r *RayClusterReconciler) buildHeadPod(instance rayiov1alpha1.RayCluster) c
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling
podConf := common.DefaultHeadPodTemplate(instance, instance.Spec.HeadGroupSpec, podName, svcName, headPort)
pod := common.BuildPod(podConf, rayiov1alpha1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, svcName, headPort, autoscalingEnabled)
creatorName := getCreator(instance)
pod := common.BuildPod(podConf, rayiov1alpha1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, svcName, headPort, autoscalingEnabled, creatorName)
// Set raycluster instance as the owner and controller
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
r.Log.Error(err, "Failed to set controller reference for raycluster pod")
Expand All @@ -698,6 +699,19 @@ func (r *RayClusterReconciler) buildHeadPod(instance rayiov1alpha1.RayCluster) c
return pod
}

func getCreator(instance rayiov1alpha1.RayCluster) string {
if instance.Labels == nil {
return ""
}
creatorName, exist := instance.Labels[common.KubernetesCreatedByLabelKey]

if !exist {
return ""
}

return creatorName
}

// Build worker instance pods.
func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster, worker rayiov1alpha1.WorkerGroupSpec) corev1.Pod {
podName := strings.ToLower(instance.Name + common.DashSymbol + string(rayiov1alpha1.WorkerNode) + common.DashSymbol + worker.GroupName + common.DashSymbol)
Expand All @@ -707,7 +721,8 @@ func (r *RayClusterReconciler) buildWorkerPod(instance rayiov1alpha1.RayCluster,
headPort := common.GetHeadPort(instance.Spec.HeadGroupSpec.RayStartParams)
autoscalingEnabled := instance.Spec.EnableInTreeAutoscaling
podTemplateSpec := common.DefaultWorkerPodTemplate(instance, worker, podName, svcName, headPort)
pod := common.BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, headPort, autoscalingEnabled)
creatorName := getCreator(instance)
pod := common.BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, svcName, headPort, autoscalingEnabled, creatorName)
// Set raycluster instance as the owner and controller
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
r.Log.Error(err, "Failed to set controller reference for raycluster pod")
Expand Down
1 change: 1 addition & 0 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(rayService *rayv
rayClusterLabel[k] = v
}
rayClusterLabel[common.RayServiceLabelKey] = rayService.Name
rayClusterLabel[common.KubernetesCreatedByLabelKey] = common.RayServiceCreatorLabelValue

rayClusterAnnotations := make(map[string]string)
for k, v := range rayService.Annotations {
Expand Down