From 2019b4b0ab0ac7b70d9c059148cc639b7dd9b556 Mon Sep 17 00:00:00 2001 From: Kai-Hsun Chen Date: Thu, 27 Apr 2023 11:12:22 -0700 Subject: [PATCH] [Bug][GCS FT] Worker pods crash unexpectedly when gcs_server on head pod is killed (#1036) Worker pods crash unexpectedly when gcs_server on head pod is killed --- .../ray/v1alpha1/raycluster_types_test.go | 6 +- .../apis/ray/v1alpha1/rayjob_types_test.go | 12 ++-- .../ray/v1alpha1/rayservice_types_test.go | 12 ++-- .../controllers/ray/common/constant.go | 4 +- ray-operator/controllers/ray/common/pod.go | 15 ++-- .../controllers/ray/common/pod_test.go | 70 +++++++++++++++++++ .../controllers/ray/common/service_test.go | 3 +- .../controllers/ray/rayjob_controller_test.go | 6 +- .../ray/rayservice_controller_test.go | 6 +- tests/compatibility-test.py | 56 ++++++++------- tests/config/ray-cluster.ray-ft.yaml.template | 5 ++ 11 files changed, 133 insertions(+), 62 deletions(-) diff --git a/ray-operator/apis/ray/v1alpha1/raycluster_types_test.go b/ray-operator/apis/ray/v1alpha1/raycluster_types_test.go index 8fbc7136d0..01293a4bfe 100644 --- a/ray-operator/apis/ray/v1alpha1/raycluster_types_test.go +++ b/ray-operator/apis/ray/v1alpha1/raycluster_types_test.go @@ -30,8 +30,7 @@ var myRayCluster = &RayCluster{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "headgroup", + "groupName": "headgroup", }, }, Spec: corev1.PodSpec{ @@ -71,8 +70,7 @@ var myRayCluster = &RayCluster{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "small-group", + "groupName": "small-group", }, }, Spec: corev1.PodSpec{ diff --git a/ray-operator/apis/ray/v1alpha1/rayjob_types_test.go b/ray-operator/apis/ray/v1alpha1/rayjob_types_test.go index 47f1c3a8b7..aa3d278e29 100644 --- a/ray-operator/apis/ray/v1alpha1/rayjob_types_test.go +++ b/ray-operator/apis/ray/v1alpha1/rayjob_types_test.go @@ -37,8 +37,7 @@ var expectedRayJob = RayJob{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "headgroup", + "groupName": "headgroup", }, Annotations: map[string]string{ "key": "value", @@ -102,8 +101,7 @@ var expectedRayJob = RayJob{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "small-group", + "groupName": "small-group", }, }, Spec: corev1.PodSpec{ @@ -162,8 +160,7 @@ var testRayJobJSON = `{ "metadata": { "creationTimestamp": null, "labels": { - "groupName": "headgroup", - "rayCluster": "raycluster-sample" + "groupName": "headgroup" }, "annotations": { "key": "value" @@ -228,8 +225,7 @@ var testRayJobJSON = `{ "namespace": "default", "creationTimestamp": null, "labels": { - "groupName": "small-group", - "rayCluster": "raycluster-sample" + "groupName": "small-group" } }, "spec": { diff --git a/ray-operator/apis/ray/v1alpha1/rayservice_types_test.go b/ray-operator/apis/ray/v1alpha1/rayservice_types_test.go index bd8ae6f25b..ae2fe6267b 100644 --- a/ray-operator/apis/ray/v1alpha1/rayservice_types_test.go +++ b/ray-operator/apis/ray/v1alpha1/rayservice_types_test.go @@ -85,8 +85,7 @@ var myRayService = &RayService{ Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "headgroup", + "groupName": "headgroup", }, Annotations: map[string]string{ "key": "value", @@ -155,8 +154,7 @@ var myRayService = &RayService{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "small-group", + "groupName": "small-group", }, }, Spec: corev1.PodSpec{ @@ -254,8 +252,7 @@ var expected = `{ "metadata":{ "creationTimestamp":null, "labels":{ - "groupName":"headgroup", - "rayCluster":"raycluster-sample" + "groupName": "headgroup" }, "annotations":{ "key":"value" @@ -325,8 +322,7 @@ var expected = `{ "namespace":"default", "creationTimestamp":null, "labels":{ - "groupName":"small-group", - "rayCluster":"raycluster-sample" + "groupName":"small-group" } }, "spec":{ diff --git a/ray-operator/controllers/ray/common/constant.go b/ray-operator/controllers/ray/common/constant.go index a41ecbc0e5..205878055c 100644 --- a/ray-operator/controllers/ray/common/constant.go +++ b/ray-operator/controllers/ray/common/constant.go @@ -79,6 +79,7 @@ const ( RAY_ADDRESS = "RAY_ADDRESS" REDIS_PASSWORD = "REDIS_PASSWORD" RAY_EXTERNAL_STORAGE_NS = "RAY_external_storage_namespace" + RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S = "RAY_gcs_rpc_server_reconnect_timeout_s" RAY_TIMEOUT_MS_TASK_WAIT_FOR_DEATH_INFO = "RAY_timeout_ms_task_wait_for_death_info" RAY_GCS_SERVER_REQUEST_TIMEOUT_SECONDS = "RAY_gcs_server_request_timeout_seconds" RAY_SERVE_KV_TIMEOUT_S = "RAY_SERVE_KV_TIMEOUT_S" @@ -88,7 +89,8 @@ const ( RAYCLUSTER_DEFAULT_REQUEUE_SECONDS = 300 // Ray core default configurations - DefaultRedisPassword = "5241590000000000" + DefaultRedisPassword = "5241590000000000" + DefaultWorkerRayGcsReconnectTimeoutS = "600" LOCAL_HOST = "127.0.0.1" // Ray FT default readiness probe values diff --git a/ray-operator/controllers/ray/common/pod.go b/ray-operator/controllers/ray/common/pod.go index 9d1310bdd3..a36e9420fb 100644 --- a/ray-operator/controllers/ray/common/pod.go +++ b/ray-operator/controllers/ray/common/pod.go @@ -650,13 +650,20 @@ func setContainerEnvVars(pod *v1.Pod, rayContainerIndex int, rayNodeType rayiov1 } } } + if !envVarExists(RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, container.Env) && rayNodeType == rayiov1alpha1.WorkerNode { + // If GCS FT is enabled and RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S is not set, set the worker's + // RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S to 600s. If the worker cannot reconnect to GCS within + // 600s, the Raylet will exit the process. By default, the value is 60s, so the head node will + // crash if the GCS server is down for more than 60s. Typically, the new GCS server will be available + // in 120 seconds, so we set the timeout to 600s to avoid the worker nodes crashing. + if ftEnabled := pod.Annotations[RayFTEnabledAnnotationKey] == "true"; ftEnabled { + gcsTimeout := v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: DefaultWorkerRayGcsReconnectTimeoutS} + container.Env = append(container.Env, gcsTimeout) + } + } } func envVarExists(envName string, envVars []v1.EnvVar) bool { - if len(envVars) == 0 { - return false - } - for _, env := range envVars { if env.Name == envName { return true diff --git a/ray-operator/controllers/ray/common/pod_test.go b/ray-operator/controllers/ray/common/pod_test.go index a12dc5365b..0aba209193 100644 --- a/ray-operator/controllers/ray/common/pod_test.go +++ b/ray-operator/controllers/ray/common/pod_test.go @@ -526,6 +526,76 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) { assert.True(t, hasCorrectDeathEnv) } +func TestBuildPod_WithGcsFtEnabled(t *testing.T) { + // Test 1 + cluster := instance.DeepCopy() + cluster.Annotations = map[string]string{ + RayFTEnabledAnnotationKey: "true", + } + + // Build a head Pod. + podName := strings.ToLower(cluster.Name + DashSymbol + string(rayiov1alpha1.HeadNode) + DashSymbol + utils.FormatInt32(0)) + podTemplateSpec := DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379") + pod := BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "") + + // Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" + rayContainerIndex := getRayContainerIndex(pod.Spec) + rayContainer := pod.Spec.Containers[rayContainerIndex] + + // "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" should not be set on the head Pod by default + assert.True(t, !envVarExists(RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, rayContainer.Env)) + + // Test 2 + cluster = instance.DeepCopy() + cluster.Annotations = map[string]string{ + RayFTEnabledAnnotationKey: "true", + } + + // Add "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" env var in the head group spec. + cluster.Spec.HeadGroupSpec.Template.Spec.Containers[rayContainerIndex].Env = append(cluster.Spec.HeadGroupSpec.Template.Spec.Containers[rayContainerIndex].Env, + v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "60"}) + podTemplateSpec = DefaultHeadPodTemplate(*cluster, cluster.Spec.HeadGroupSpec, podName, "6379") + pod = BuildPod(podTemplateSpec, rayiov1alpha1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, "", "") + rayContainer = pod.Spec.Containers[rayContainerIndex] + + // Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" + checkContainerEnv(t, rayContainer, RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, "60") + + // Test 3 + cluster = instance.DeepCopy() + cluster.Annotations = map[string]string{ + RayFTEnabledAnnotationKey: "true", + } + + // Build a worker pod + worker := cluster.Spec.WorkerGroupSpecs[0] + podName = cluster.Name + DashSymbol + string(rayiov1alpha1.WorkerNode) + DashSymbol + worker.GroupName + DashSymbol + utils.FormatInt32(0) + fqdnRayIP := utils.GenerateFQDNServiceName(cluster.Name, cluster.Namespace) + podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379") + pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP) + + // Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" + rayContainer = pod.Spec.Containers[rayContainerIndex] + checkContainerEnv(t, rayContainer, RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, DefaultWorkerRayGcsReconnectTimeoutS) + + // Test 4 + cluster = instance.DeepCopy() + cluster.Annotations = map[string]string{ + RayFTEnabledAnnotationKey: "true", + } + + // Add "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" env var in the worker group spec. + cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[rayContainerIndex].Env = append(cluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[rayContainerIndex].Env, + v1.EnvVar{Name: RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "120"}) + worker = cluster.Spec.WorkerGroupSpecs[0] + podTemplateSpec = DefaultWorkerPodTemplate(*cluster, worker, podName, fqdnRayIP, "6379") + pod = BuildPod(podTemplateSpec, rayiov1alpha1.WorkerNode, worker.RayStartParams, "6379", nil, "", fqdnRayIP) + + // Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S" + rayContainer = pod.Spec.Containers[rayContainerIndex] + checkContainerEnv(t, rayContainer, RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, "120") +} + // Check that autoscaler container overrides work as expected. func TestBuildPodWithAutoscalerOptions(t *testing.T) { cluster := instance.DeepCopy() diff --git a/ray-operator/controllers/ray/common/service_test.go b/ray-operator/controllers/ray/common/service_test.go index 424fb9be0e..1cf93ec4c3 100644 --- a/ray-operator/controllers/ray/common/service_test.go +++ b/ray-operator/controllers/ray/common/service_test.go @@ -35,8 +35,7 @@ var instanceWithWrongSvc = &rayiov1alpha1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "headgroup", + "groupName": "headgroup", }, }, Spec: corev1.PodSpec{ diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index 9ea5b83a87..3433b50146 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -62,8 +62,7 @@ var _ = Context("Inside the default namespace", func() { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "headgroup", + "groupName": "headgroup", }, Annotations: map[string]string{ "key": "value", @@ -132,8 +131,7 @@ var _ = Context("Inside the default namespace", func() { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "small-group", + "groupName": "small-group", }, }, Spec: corev1.PodSpec{ diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index 80365b6222..0da0921e96 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -100,8 +100,7 @@ var _ = Context("Inside the default namespace", func() { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "headgroup", + "groupName": "headgroup", }, Annotations: map[string]string{ "key": "value", @@ -178,8 +177,7 @@ var _ = Context("Inside the default namespace", func() { ObjectMeta: metav1.ObjectMeta{ Namespace: "default", Labels: map[string]string{ - "rayCluster": "raycluster-sample", - "groupName": "small-group", + "groupName": "small-group", }, }, Spec: corev1.PodSpec{ diff --git a/tests/compatibility-test.py b/tests/compatibility-test.py index bb69d155fc..1fdb7aff31 100755 --- a/tests/compatibility-test.py +++ b/tests/compatibility-test.py @@ -41,6 +41,7 @@ class BasicRayTestCase(unittest.TestCase): """Test the basic functionalities of RayCluster by executing simple jobs.""" cluster_template = CONST.REPO_ROOT.joinpath("tests/config/ray-cluster.mini.yaml.template") + ray_cluster_ns = "default" @classmethod def setUpClass(cls): @@ -60,10 +61,10 @@ def test_simple_code(self): Run a simple example in the head Pod to test the basic functionality of the Ray cluster. The example is from https://docs.ray.io/en/latest/ray-core/walkthrough.html#running-a-task. """ - cluster_namespace = "default" - headpod = get_head_pod(cluster_namespace) + headpod = get_head_pod(BasicRayTestCase.ray_cluster_ns) headpod_name = headpod.metadata.name - pod_exec_command(headpod_name, cluster_namespace, "python samples/simple_code.py") + pod_exec_command( + headpod_name, BasicRayTestCase.ray_cluster_ns, "python samples/simple_code.py") def test_cluster_info(self): """Execute "print(ray.cluster_resources())" in the head Pod.""" @@ -73,6 +74,7 @@ def test_cluster_info(self): class RayFTTestCase(unittest.TestCase): """Test Ray GCS Fault Tolerance""" cluster_template = CONST.REPO_ROOT.joinpath("tests/config/ray-cluster.ray-ft.yaml.template") + ray_cluster_ns = "default" @classmethod def setUpClass(cls): @@ -110,8 +112,7 @@ def test_kill_head(self): def test_ray_serve(self): """Kill GCS process on the head Pod and then test a deployed Ray Serve model.""" - cluster_namespace = "default" - headpod = get_head_pod(cluster_namespace) + headpod = get_head_pod(RayFTTestCase.ray_cluster_ns) headpod_name = headpod.metadata.name # RAY_NAMESPACE is an abstraction in Ray. It is not a Kubernetes namespace. @@ -119,47 +120,47 @@ def test_ray_serve(self): logger.info('Ray namespace: %s', ray_namespace) # Deploy a Ray Serve model. - exit_code = pod_exec_command(headpod_name, cluster_namespace, + exit_code = pod_exec_command(headpod_name, RayFTTestCase.ray_cluster_ns, f" python samples/test_ray_serve_1.py {ray_namespace}", check = False ) if exit_code != 0: - show_cluster_info(cluster_namespace) + show_cluster_info(RayFTTestCase.ray_cluster_ns) raise Exception( f"Fail to execute test_ray_serve_1.py. The exit code is {exit_code}." ) - old_head_pod = get_head_pod(cluster_namespace) + old_head_pod = get_head_pod(RayFTTestCase.ray_cluster_ns) old_head_pod_name = old_head_pod.metadata.name restart_count = old_head_pod.status.container_statuses[0].restart_count - # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head - # node pod will be terminated. - pod_exec_command(old_head_pod_name, cluster_namespace, "pkill gcs_server") + # Kill the gcs_server process on head node. The head node will crash after 20 seconds + # because the value of `RAY_gcs_rpc_server_reconnect_timeout_s` is "20" in the + # `ray-cluster.ray-ft.yaml.template` file. + pod_exec_command(old_head_pod_name, RayFTTestCase.ray_cluster_ns, "pkill gcs_server") # Waiting for all pods become ready and running. utils.wait_for_new_head(old_head_pod_name, restart_count, - cluster_namespace, timeout=300, retry_interval_ms=1000) + RayFTTestCase.ray_cluster_ns, timeout=300, retry_interval_ms=1000) # Try to connect to the deployed model again - headpod = get_head_pod(cluster_namespace) + headpod = get_head_pod(RayFTTestCase.ray_cluster_ns) headpod_name = headpod.metadata.name - exit_code = pod_exec_command(headpod_name, cluster_namespace, + exit_code = pod_exec_command(headpod_name, RayFTTestCase.ray_cluster_ns, f" python samples/test_ray_serve_2.py {ray_namespace}", check = False ) if exit_code != 0: - show_cluster_info(cluster_namespace) + show_cluster_info(RayFTTestCase.ray_cluster_ns) raise Exception( f"Fail to execute test_ray_serve_2.py. The exit code is {exit_code}." ) def test_detached_actor(self): """Kill GCS process on the head Pod and then test a detached actor.""" - cluster_namespace = "default" - headpod = get_head_pod(cluster_namespace) + headpod = get_head_pod(RayFTTestCase.ray_cluster_ns) headpod_name = headpod.metadata.name # RAY_NAMESPACE is an abstraction in Ray. It is not a Kubernetes namespace. @@ -167,42 +168,43 @@ def test_detached_actor(self): logger.info('Ray namespace: %s', ray_namespace) # Register a detached actor - exit_code = pod_exec_command(headpod_name, cluster_namespace, + exit_code = pod_exec_command(headpod_name, RayFTTestCase.ray_cluster_ns, f" python samples/test_detached_actor_1.py {ray_namespace}", check = False ) if exit_code != 0: - show_cluster_info(cluster_namespace) + show_cluster_info(RayFTTestCase.ray_cluster_ns) raise Exception( f"Fail to execute test_detached_actor_1.py. The exit code is {exit_code}." ) - old_head_pod = get_head_pod(cluster_namespace) + old_head_pod = get_head_pod(RayFTTestCase.ray_cluster_ns) old_head_pod_name = old_head_pod.metadata.name restart_count = old_head_pod.status.container_statuses[0].restart_count - # Kill the gcs_server process on head node. If fate sharing is enabled, the whole head - # node pod will be terminated. - pod_exec_command(old_head_pod_name, cluster_namespace, "pkill gcs_server") + # Kill the gcs_server process on head node. The head node will crash after 20 seconds + # because the value of `RAY_gcs_rpc_server_reconnect_timeout_s` is "20" in the + # `ray-cluster.ray-ft.yaml.template` file. + pod_exec_command(old_head_pod_name, RayFTTestCase.ray_cluster_ns, "pkill gcs_server") # Waiting for all pods become ready and running. utils.wait_for_new_head(old_head_pod_name, restart_count, - cluster_namespace, timeout=300, retry_interval_ms=1000) + RayFTTestCase.ray_cluster_ns, timeout=300, retry_interval_ms=1000) # Try to connect to the detached actor again. # [Note] When all pods become running and ready, the RayCluster still needs tens of seconds # to relaunch actors. Hence, `test_detached_actor_2.py` will retry until a Ray client # connection succeeds. - headpod = get_head_pod(cluster_namespace) + headpod = get_head_pod(RayFTTestCase.ray_cluster_ns) headpod_name = headpod.metadata.name - exit_code = pod_exec_command(headpod_name, cluster_namespace, + exit_code = pod_exec_command(headpod_name, RayFTTestCase.ray_cluster_ns, f" python samples/test_detached_actor_2.py {ray_namespace}", check = False ) if exit_code != 0: - show_cluster_info(cluster_namespace) + show_cluster_info(RayFTTestCase.ray_cluster_ns) raise Exception( f"Fail to execute test_detached_actor_2.py. The exit code is {exit_code}." ) diff --git a/tests/config/ray-cluster.ray-ft.yaml.template b/tests/config/ray-cluster.ray-ft.yaml.template index 1325cc5200..d41ae6177e 100644 --- a/tests/config/ray-cluster.ray-ft.yaml.template +++ b/tests/config/ray-cluster.ray-ft.yaml.template @@ -92,6 +92,8 @@ spec: # RAY_REDIS_ADDRESS can force ray to use external redis - name: RAY_REDIS_ADDRESS value: redis:6379 + - name: RAY_gcs_rpc_server_reconnect_timeout_s + value: "20" ports: - containerPort: 6379 name: redis @@ -140,6 +142,9 @@ spec: containers: - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' image: $ray_image + env: + - name: RAY_gcs_rpc_server_reconnect_timeout_s + value: "120" resources: limits: cpu: "1"