diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 830f4a1a93..4cc1ed5924 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -531,7 +531,14 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r return RolloutNew } - // Case 1: If everything is identical except for the Replicas and WorkersToDelete of + // Case 1: If the KubeRay version has changed, update the RayCluster to get the cluster hash and new KubeRay version. + activeKubeRayVersion := activeRayCluster.ObjectMeta.Annotations[utils.KubeRayVersion] + if activeKubeRayVersion != utils.KUBERAY_VERSION { + logger.Info("Active RayCluster config doesn't match goal config due to mismatched KubeRay versions. Updating RayCluster.") + return Update + } + + // Case 2: If everything is identical except for the Replicas and WorkersToDelete of // each WorkerGroup, then do nothing. activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) @@ -548,7 +555,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r return DoNothing } - // Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of + // Case 3: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey]) if err != nil { @@ -576,7 +583,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r } } - // Case 3: Otherwise, rollout a new cluster. + // Case 4: Otherwise, rollout a new cluster. logger.Info("Active RayCluster config doesn't match goal config. " + "RayService operator should prepare a new Ray cluster.\n" + "* Active RayCluster config hash: " + activeClusterHash + "\n" + @@ -733,6 +740,9 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(ctx context.Cont } rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)) + // set the KubeRay version used to create the RayCluster + rayClusterAnnotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION + rayCluster := &rayv1.RayCluster{ ObjectMeta: metav1.ObjectMeta{ Labels: rayClusterLabel, diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 3ca8cf88de..277725610f 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -734,15 +734,18 @@ func TestReconcileRayCluster(t *testing.T) { Annotations: map[string]string{ utils.HashWithoutReplicasAndWorkersToDeleteKey: hash, utils.NumWorkerGroupsKey: strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)), + utils.KubeRayVersion: utils.KUBERAY_VERSION, }, }, } tests := map[string]struct { activeCluster *rayv1.RayCluster + kubeRayVersion string updateRayClusterSpec bool enableZeroDowntime bool shouldPrepareNewCluster bool + updateKubeRayVersion bool }{ // Test 1: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set. "Zero-downtime upgrade is enabled. Neither active nor pending clusters exist.": { @@ -779,6 +782,17 @@ func TestReconcileRayCluster(t *testing.T) { enableZeroDowntime: false, shouldPrepareNewCluster: true, }, + // Test 6: If the active KubeRay version doesn't match the KubeRay version annotation on the RayCluster, update the RayCluster's hash and KubeRay version + // annotations first before checking whether to trigger a zero downtime upgrade. This behavior occurs because when we upgrade the KubeRay CRD, the hash + // generated by different KubeRay versions may differ, which can accidentally trigger a zero downtime upgrade. + "Active RayCluster exists. KubeRay version is mismatched. Update the RayCluster.": { + activeCluster: activeCluster.DeepCopy(), + updateRayClusterSpec: true, + enableZeroDowntime: true, + shouldPrepareNewCluster: false, + updateKubeRayVersion: true, + kubeRayVersion: "new-version", + }, } for name, tc := range tests { @@ -790,11 +804,16 @@ func TestReconcileRayCluster(t *testing.T) { } runtimeObjects := []runtime.Object{} if tc.activeCluster != nil { + // Update 'ray.io/kuberay-version' to a new version if kubeRayVersion is set. + if tc.updateKubeRayVersion { + tc.activeCluster.Annotations[utils.KubeRayVersion] = tc.kubeRayVersion + } runtimeObjects = append(runtimeObjects, tc.activeCluster.DeepCopy()) } fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build() r := RayServiceReconciler{ Client: fakeClient, + Scheme: newScheme, } service := rayService.DeepCopy() if tc.updateRayClusterSpec { @@ -804,9 +823,14 @@ func TestReconcileRayCluster(t *testing.T) { service.Status.ActiveServiceStatus.RayClusterName = tc.activeCluster.Name } assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName) - _, _, err = r.reconcileRayCluster(ctx, service) + activeRayCluster, _, err := r.reconcileRayCluster(ctx, service) assert.Nil(t, err) + // If the KubeRay version has changed, check that the RayCluster annotations have been updated to the correct version. + if tc.updateKubeRayVersion && activeRayCluster != nil { + assert.Equal(t, utils.KUBERAY_VERSION, activeRayCluster.Annotations[utils.KubeRayVersion]) + } + // If KubeRay operator is preparing a new cluster, the `PendingServiceStatus.RayClusterName` should be set by calling the function `markRestart`. if tc.shouldPrepareNewCluster { assert.NotEqual(t, "", service.Status.PendingServiceStatus.RayClusterName) diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 7fa2299e50..e642c2f5ed 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -25,6 +25,7 @@ const ( RayClusterHeadlessServiceLabelKey = "ray.io/headless-worker-svc" HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete" NumWorkerGroupsKey = "ray.io/num-worker-groups" + KubeRayVersion = "ray.io/kuberay-version" // In KubeRay, the Ray container must be the first application container in a head or worker Pod. RayContainerIndex = 0