Skip to content

Commit

Permalink
Generate RayCluster Hash on KubeRay Version Change (ray-project#2320)
Browse files Browse the repository at this point in the history
* Re-generate hash when KubeRay version changes

Signed-off-by: Ryan O'Leary <[email protected]>

* Change logic to DoNothing on KubeRay version mismatch

Signed-off-by: Ryan O'Leary <[email protected]>

* Add KubeRay version annotation to test

Signed-off-by: Ryan O'Leary <[email protected]>

* Move update logic

Signed-off-by: Ryan O'Leary <[email protected]>

* Update rayservice_controller.go

Signed-off-by: ryanaoleary <[email protected]>

* Add unit test

Signed-off-by: Ryan O'Leary <[email protected]>

* Add period

Signed-off-by: Ryan O'Leary <[email protected]>

* Go vet changes

Signed-off-by: Ryan O'Leary <[email protected]>

* Update rayservice_controller_unit_test.go

Signed-off-by: ryanaoleary <[email protected]>

* Address test comments

Signed-off-by: Ryan O'Leary <[email protected]>

---------

Signed-off-by: Ryan O'Leary <[email protected]>
Signed-off-by: ryanaoleary <[email protected]>
  • Loading branch information
ryanaoleary authored Aug 23, 2024
1 parent 587c6ff commit e7f0c2c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
16 changes: 13 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,14 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r
return RolloutNew
}

// Case 1: If everything is identical except for the Replicas and WorkersToDelete of
// Case 1: If the KubeRay version has changed, update the RayCluster to get the cluster hash and new KubeRay version.
activeKubeRayVersion := activeRayCluster.ObjectMeta.Annotations[utils.KubeRayVersion]
if activeKubeRayVersion != utils.KUBERAY_VERSION {
logger.Info("Active RayCluster config doesn't match goal config due to mismatched KubeRay versions. Updating RayCluster.")
return Update
}

// Case 2: If everything is identical except for the Replicas and WorkersToDelete of
// each WorkerGroup, then do nothing.
activeClusterHash := activeRayCluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey]
goalClusterHash, err := generateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec)
Expand All @@ -548,7 +555,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r
return DoNothing
}

// Case 2: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
// Case 3: Otherwise, if everything is identical except for the Replicas and WorkersToDelete of
// the existing workergroups, and one or more new workergroups are added at the end, then update the cluster.
activeClusterNumWorkerGroups, err := strconv.Atoi(activeRayCluster.ObjectMeta.Annotations[utils.NumWorkerGroupsKey])
if err != nil {
Expand Down Expand Up @@ -576,7 +583,7 @@ func (r *RayServiceReconciler) shouldPrepareNewRayCluster(ctx context.Context, r
}
}

// Case 3: Otherwise, rollout a new cluster.
// Case 4: Otherwise, rollout a new cluster.
logger.Info("Active RayCluster config doesn't match goal config. " +
"RayService operator should prepare a new Ray cluster.\n" +
"* Active RayCluster config hash: " + activeClusterHash + "\n" +
Expand Down Expand Up @@ -733,6 +740,9 @@ func (r *RayServiceReconciler) constructRayClusterForRayService(ctx context.Cont
}
rayClusterAnnotations[utils.NumWorkerGroupsKey] = strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs))

// set the KubeRay version used to create the RayCluster
rayClusterAnnotations[utils.KubeRayVersion] = utils.KUBERAY_VERSION

rayCluster := &rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Labels: rayClusterLabel,
Expand Down
26 changes: 25 additions & 1 deletion ray-operator/controllers/ray/rayservice_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,15 +734,18 @@ func TestReconcileRayCluster(t *testing.T) {
Annotations: map[string]string{
utils.HashWithoutReplicasAndWorkersToDeleteKey: hash,
utils.NumWorkerGroupsKey: strconv.Itoa(len(rayService.Spec.RayClusterSpec.WorkerGroupSpecs)),
utils.KubeRayVersion: utils.KUBERAY_VERSION,
},
},
}

tests := map[string]struct {
activeCluster *rayv1.RayCluster
kubeRayVersion string
updateRayClusterSpec bool
enableZeroDowntime bool
shouldPrepareNewCluster bool
updateKubeRayVersion bool
}{
// Test 1: Neither active nor pending clusters exist. The `markRestart` function will be called, so the `PendingServiceStatus.RayClusterName` should be set.
"Zero-downtime upgrade is enabled. Neither active nor pending clusters exist.": {
Expand Down Expand Up @@ -779,6 +782,17 @@ func TestReconcileRayCluster(t *testing.T) {
enableZeroDowntime: false,
shouldPrepareNewCluster: true,
},
// Test 6: If the active KubeRay version doesn't match the KubeRay version annotation on the RayCluster, update the RayCluster's hash and KubeRay version
// annotations first before checking whether to trigger a zero downtime upgrade. This behavior occurs because when we upgrade the KubeRay CRD, the hash
// generated by different KubeRay versions may differ, which can accidentally trigger a zero downtime upgrade.
"Active RayCluster exists. KubeRay version is mismatched. Update the RayCluster.": {
activeCluster: activeCluster.DeepCopy(),
updateRayClusterSpec: true,
enableZeroDowntime: true,
shouldPrepareNewCluster: false,
updateKubeRayVersion: true,
kubeRayVersion: "new-version",
},
}

for name, tc := range tests {
Expand All @@ -790,11 +804,16 @@ func TestReconcileRayCluster(t *testing.T) {
}
runtimeObjects := []runtime.Object{}
if tc.activeCluster != nil {
// Update 'ray.io/kuberay-version' to a new version if kubeRayVersion is set.
if tc.updateKubeRayVersion {
tc.activeCluster.Annotations[utils.KubeRayVersion] = tc.kubeRayVersion
}
runtimeObjects = append(runtimeObjects, tc.activeCluster.DeepCopy())
}
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
r := RayServiceReconciler{
Client: fakeClient,
Scheme: newScheme,
}
service := rayService.DeepCopy()
if tc.updateRayClusterSpec {
Expand All @@ -804,9 +823,14 @@ func TestReconcileRayCluster(t *testing.T) {
service.Status.ActiveServiceStatus.RayClusterName = tc.activeCluster.Name
}
assert.Equal(t, "", service.Status.PendingServiceStatus.RayClusterName)
_, _, err = r.reconcileRayCluster(ctx, service)
activeRayCluster, _, err := r.reconcileRayCluster(ctx, service)
assert.Nil(t, err)

// If the KubeRay version has changed, check that the RayCluster annotations have been updated to the correct version.
if tc.updateKubeRayVersion && activeRayCluster != nil {
assert.Equal(t, utils.KUBERAY_VERSION, activeRayCluster.Annotations[utils.KubeRayVersion])
}

// If KubeRay operator is preparing a new cluster, the `PendingServiceStatus.RayClusterName` should be set by calling the function `markRestart`.
if tc.shouldPrepareNewCluster {
assert.NotEqual(t, "", service.Status.PendingServiceStatus.RayClusterName)
Expand Down
1 change: 1 addition & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
RayClusterHeadlessServiceLabelKey = "ray.io/headless-worker-svc"
HashWithoutReplicasAndWorkersToDeleteKey = "ray.io/hash-without-replicas-and-workers-to-delete"
NumWorkerGroupsKey = "ray.io/num-worker-groups"
KubeRayVersion = "ray.io/kuberay-version"

// In KubeRay, the Ray container must be the first application container in a head or worker Pod.
RayContainerIndex = 0
Expand Down

0 comments on commit e7f0c2c

Please sign in to comment.