Skip to content

Commit

Permalink
[Bug][GCS FT] Worker pods crash unexpectedly when gcs_server on head …
Browse files Browse the repository at this point in the history
…pod is killed (#1036)

Worker pods crash unexpectedly when gcs_server on head pod is killed
  • Loading branch information
kevin85421 authored Apr 27, 2023
1 parent 473dfdb commit 2019b4b
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 62 deletions.
6 changes: 2 additions & 4 deletions ray-operator/apis/ray/v1alpha1/raycluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ var myRayCluster = &RayCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
"groupName": "headgroup",
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -71,8 +70,7 @@ var myRayCluster = &RayCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "small-group",
"groupName": "small-group",
},
},
Spec: corev1.PodSpec{
Expand Down
12 changes: 4 additions & 8 deletions ray-operator/apis/ray/v1alpha1/rayjob_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ var expectedRayJob = RayJob{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
"groupName": "headgroup",
},
Annotations: map[string]string{
"key": "value",
Expand Down Expand Up @@ -102,8 +101,7 @@ var expectedRayJob = RayJob{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "small-group",
"groupName": "small-group",
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -162,8 +160,7 @@ var testRayJobJSON = `{
"metadata": {
"creationTimestamp": null,
"labels": {
"groupName": "headgroup",
"rayCluster": "raycluster-sample"
"groupName": "headgroup"
},
"annotations": {
"key": "value"
Expand Down Expand Up @@ -228,8 +225,7 @@ var testRayJobJSON = `{
"namespace": "default",
"creationTimestamp": null,
"labels": {
"groupName": "small-group",
"rayCluster": "raycluster-sample"
"groupName": "small-group"
}
},
"spec": {
Expand Down
12 changes: 4 additions & 8 deletions ray-operator/apis/ray/v1alpha1/rayservice_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ var myRayService = &RayService{
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
"groupName": "headgroup",
},
Annotations: map[string]string{
"key": "value",
Expand Down Expand Up @@ -155,8 +154,7 @@ var myRayService = &RayService{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "small-group",
"groupName": "small-group",
},
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -254,8 +252,7 @@ var expected = `{
"metadata":{
"creationTimestamp":null,
"labels":{
"groupName":"headgroup",
"rayCluster":"raycluster-sample"
"groupName": "headgroup"
},
"annotations":{
"key":"value"
Expand Down Expand Up @@ -325,8 +322,7 @@ var expected = `{
"namespace":"default",
"creationTimestamp":null,
"labels":{
"groupName":"small-group",
"rayCluster":"raycluster-sample"
"groupName":"small-group"
}
},
"spec":{
Expand Down
4 changes: 3 additions & 1 deletion ray-operator/controllers/ray/common/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
RAY_ADDRESS = "RAY_ADDRESS"
REDIS_PASSWORD = "REDIS_PASSWORD"
RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace"
RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S = "RAY_gcs_rpc_server_reconnect_timeout_s"
RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO = "RAY_timeout_ms_task_wait_for_death_info"
RAY_GCS_SERVER_REQUEST_TIMEOUT_SECONDS = "RAY_gcs_server_request_timeout_seconds"
RAY_SERVE_KV_TIMEOUT_S = "RAY_SERVE_KV_TIMEOUT_S"
Expand All @@ -88,7 +89,8 @@ const (
RAYCLUSTER_DEFAULT_REQUEUE_SECONDS = 300

// Ray core default configurations
DefaultRedisPassword = "5241590000000000"
DefaultRedisPassword = "5241590000000000"
DefaultWorkerRayGcsReconnectTimeoutS = "600"

LOCAL_HOST = "127.0.0.1"
// Ray FT default readiness probe values
Expand Down
15 changes: 11 additions & 4 deletions ray-operator/controllers/ray/common/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,20 @@ func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1
}
}
}
if !envVarExists(RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, container.Env) && rayNodeType == rayiov1alpha1.WorkerNode {
// If GCS FT is enabled and RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S is not set, set the worker's
// RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S to 600s. If the worker cannot reconnect to GCS within
// 600s, the Raylet will exit the process. By default, the value is 60s, so the head node will
// crash if the GCS server is down for more than 60s. Typically, the new GCS server will be available
// in 120 seconds, so we set the timeout to 600s to avoid the worker nodes crashing.
if ftEnabled := pod.Annotations[RayFTEnabledAnnotationKey] == "true"; ftEnabled {
gcsTimeout := v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: DefaultWorkerRayGcsReconnectTimeoutS}
container.Env = append(container.Env, gcsTimeout)
}
}
}

func envVarExists(envName string, envVars []v1.EnvVar) bool {
if len(envVars) == 0 {
return false
}

for _, env := range envVars {
if env.Name == envName {
return true
Expand Down
70 changes: 70 additions & 0 deletions ray-operator/controllers/ray/common/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,76 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) {
assert.True(t, hasCorrectDeathEnv)
}

func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
// Test 1
cluster := instance.DeepCopy()
cluster.Annotations = map[string]string{
RayFTEnabledAnnotationKey: "true",
}

// Build a head Pod.
podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0))
podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "")

// Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
rayContainerIndex := getRayContainerIndex(pod.Spec)
rayContainer := pod.Spec.Containers[rayContainerIndex]

// "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" should not be set on the head Pod by default
assert.True(t, !envVarExists(RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, rayContainer.Env))

// Test 2
cluster = instance.DeepCopy()
cluster.Annotations = map[string]string{
RayFTEnabledAnnotationKey: "true",
}

// Add "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" env var in the head group spec.
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[rayContainerIndex].Env = append(cluster.Spec.HeadGroupSpec.Template.Spec.Containers[rayContainerIndex].Env,
v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "60"})
podTemplateSpec = DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
pod = BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "")
rayContainer = pod.Spec.Containers[rayContainerIndex]

// Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
checkContainerEnv(t, rayContainer, RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, "60")

// Test 3
cluster = instance.DeepCopy()
cluster.Annotations = map[string]string{
RayFTEnabledAnnotationKey: "true",
}

// Build a worker pod
worker := cluster.Spec.WorkerGroupSpecs[0]
podName = cluster.Name + DashSymbol + string(rayiov1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0)
fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace)
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)

// Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
rayContainer = pod.Spec.Containers[rayContainerIndex]
checkContainerEnv(t, rayContainer, RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, DefaultWorkerRayGcsReconnectTimeoutS)

// Test 4
cluster = instance.DeepCopy()
cluster.Annotations = map[string]string{
RayFTEnabledAnnotationKey: "true",
}

// Add "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" env var in the worker group spec.
cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[rayContainerIndex].Env = append(cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[rayContainerIndex].Env,
v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "120"})
worker = cluster.Spec.WorkerGroupSpecs[0]
podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379")
pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP)

// Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
rayContainer = pod.Spec.Containers[rayContainerIndex]
checkContainerEnv(t, rayContainer, RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, "120")
}

// Check that autoscaler container overrides work as expected.
func TestBuildPodWithAutoscalerOptions(t *testing.T) {
cluster := instance.DeepCopy()
Expand Down
3 changes: 1 addition & 2 deletions ray-operator/controllers/ray/common/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ var instanceWithWrongSvc = &rayiov1alpha1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
"groupName": "headgroup",
},
},
Spec: corev1.PodSpec{
Expand Down
6 changes: 2 additions & 4 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ var _ = Context("Inside the default namespace", func() {
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
"groupName": "headgroup",
},
Annotations: map[string]string{
"key": "value",
Expand Down Expand Up @@ -132,8 +131,7 @@ var _ = Context("Inside the default namespace", func() {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "small-group",
"groupName": "small-group",
},
},
Spec: corev1.PodSpec{
Expand Down
6 changes: 2 additions & 4 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ var _ = Context("Inside the default namespace", func() {
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "headgroup",
"groupName": "headgroup",
},
Annotations: map[string]string{
"key": "value",
Expand Down Expand Up @@ -178,8 +177,7 @@ var _ = Context("Inside the default namespace", func() {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Labels: map[string]string{
"rayCluster": "raycluster-sample",
"groupName": "small-group",
"groupName": "small-group",
},
},
Spec: corev1.PodSpec{
Expand Down
Loading

0 comments on commit 2019b4b

Please sign in to comment.