diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml index 2266bbe27f..ce2e7fce39 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml @@ -11277,6 +11277,11 @@ spec: each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation observed + for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State type: string diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml index 27306a9b7c..88d724463a 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml @@ -11772,6 +11772,11 @@ spec: type: string message: type: string + observedGeneration: + description: observedGeneration is the most recent generation observed + for this RayJob. + format: int64 + type: integer rayClusterName: type: string rayClusterStatus: @@ -11816,6 +11821,11 @@ spec: of each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation + observed for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State type: string diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index 5feae3f23d..5756e547d2 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -11872,6 +11872,11 @@ spec: of each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation + observed for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State @@ -11905,6 +11910,11 @@ spec: type: object type: array type: object + observedGeneration: + description: observedGeneration is the most recent generation observed + for this RayService. + format: int64 + type: integer pendingServiceStatus: description: Pending Service Status indicates a RayCluster will be created or is being created. @@ -11983,6 +11993,11 @@ spec: of each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation + observed for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State diff --git a/ray-operator/apis/ray/v1alpha1/raycluster_types.go b/ray-operator/apis/ray/v1alpha1/raycluster_types.go index ee05c02e90..a3c96c6df1 100644 --- a/ray-operator/apis/ray/v1alpha1/raycluster_types.go +++ b/ray-operator/apis/ray/v1alpha1/raycluster_types.go @@ -126,6 +126,10 @@ type RayClusterStatus struct { Head HeadInfo `json:"head,omitempty"` // Reason provides more information about current State Reason string `json:"reason,omitempty"` + // observedGeneration is the most recent generation observed for this RayCluster. It corresponds to the + // RayCluster's generation, which is updated on mutation by the API Server. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // HeadInfo gives info about head diff --git a/ray-operator/apis/ray/v1alpha1/rayjob_types.go b/ray-operator/apis/ray/v1alpha1/rayjob_types.go index d03bdf8d77..fe6c9cef01 100644 --- a/ray-operator/apis/ray/v1alpha1/rayjob_types.go +++ b/ray-operator/apis/ray/v1alpha1/rayjob_types.go @@ -80,6 +80,10 @@ type RayJobStatus struct { // Represents time when the job was ended. EndTime *metav1.Time `json:"endTime,omitempty"` RayClusterStatus RayClusterStatus `json:"rayClusterStatus,omitempty"` + // observedGeneration is the most recent generation observed for this RayJob. It corresponds to the + // RayJob's generation, which is updated on mutation by the API Server. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } //+kubebuilder:object:root=true diff --git a/ray-operator/apis/ray/v1alpha1/rayservice_types.go b/ray-operator/apis/ray/v1alpha1/rayservice_types.go index 7477b0b917..4d4f03a8c9 100644 --- a/ray-operator/apis/ray/v1alpha1/rayservice_types.go +++ b/ray-operator/apis/ray/v1alpha1/rayservice_types.go @@ -95,6 +95,10 @@ type RayServiceStatuses struct { PendingServiceStatus RayServiceStatus `json:"pendingServiceStatus,omitempty"` // ServiceStatus indicates the current RayService status. ServiceStatus ServiceStatus `json:"serviceStatus,omitempty"` + // observedGeneration is the most recent generation observed for this RayService. It corresponds to the + // RayService's generation, which is updated on mutation by the API Server. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } type RayServiceStatus struct { diff --git a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml index 2266bbe27f..ce2e7fce39 100644 --- a/ray-operator/config/crd/bases/ray.io_rayclusters.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayclusters.yaml @@ -11277,6 +11277,11 @@ spec: each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation observed + for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State type: string diff --git a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml index 27306a9b7c..88d724463a 100644 --- a/ray-operator/config/crd/bases/ray.io_rayjobs.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayjobs.yaml @@ -11772,6 +11772,11 @@ spec: type: string message: type: string + observedGeneration: + description: observedGeneration is the most recent generation observed + for this RayJob. + format: int64 + type: integer rayClusterName: type: string rayClusterStatus: @@ -11816,6 +11821,11 @@ spec: of each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation + observed for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State type: string diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index 5feae3f23d..5756e547d2 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -11872,6 +11872,11 @@ spec: of each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation + observed for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State @@ -11905,6 +11910,11 @@ spec: type: object type: array type: object + observedGeneration: + description: observedGeneration is the most recent generation observed + for this RayService. + format: int64 + type: integer pendingServiceStatus: description: Pending Service Status indicates a RayCluster will be created or is being created. @@ -11983,6 +11993,11 @@ spec: of each node group. format: int32 type: integer + observedGeneration: + description: observedGeneration is the most recent generation + observed for this RayCluster. + format: int64 + type: integer reason: description: Reason provides more information about current State diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index e3fe1d1721..f74c3607eb 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -8,8 +8,6 @@ import ( "strings" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" @@ -25,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" @@ -828,31 +827,19 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu } func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster) error { + // TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not. + instance.Status.ObservedGeneration = instance.ObjectMeta.Generation + runtimePods := corev1.PodList{} filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name} if err := r.List(context.TODO(), &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil { return err } - count := utils.CalculateAvailableReplicas(runtimePods) - if instance.Status.AvailableWorkerReplicas != count { - instance.Status.AvailableWorkerReplicas = count - } - - count = utils.CalculateDesiredReplicas(instance) - if instance.Status.DesiredWorkerReplicas != count { - instance.Status.DesiredWorkerReplicas = count - } - - count = utils.CalculateMinReplicas(instance) - if instance.Status.MinWorkerReplicas != count { - instance.Status.MinWorkerReplicas = count - } - - count = utils.CalculateMaxReplicas(instance) - if instance.Status.MaxWorkerReplicas != count { - instance.Status.MaxWorkerReplicas = count - } + instance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods) + instance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(instance) + instance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(instance) + instance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(instance) // validation for the RayStartParam for the state. isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec) diff --git a/ray-operator/controllers/ray/raycluster_controller_fake_test.go b/ray-operator/controllers/ray/raycluster_controller_fake_test.go index da61bf09b8..208c23cdb9 100644 --- a/ray-operator/controllers/ray/raycluster_controller_fake_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_fake_test.go @@ -966,3 +966,58 @@ func TestGetHeadServiceIP(t *testing.T) { }) } } + +func TestUpdateStatusObservedGeneration(t *testing.T) { + setupTest(t) + defer tearDown(t) + + // Create a new scheme with CRDs, Pod, Service schemes. + newScheme := runtime.NewScheme() + _ = rayiov1alpha1.AddToScheme(newScheme) + _ = corev1.AddToScheme(newScheme) + + // To update the status of RayCluster with `r.Status().Update()`, + // initialize the runtimeObjects with appropriate context. In KubeRay, the `ClusterIP` + // and `TargetPort` fields are typically set by the cluster's control plane. + headService, err := common.BuildServiceForHeadPod(*testRayCluster, nil, nil) + assert.Nil(t, err, "Failed to build head service.") + headService.Spec.ClusterIP = headNodeIP + for i, port := range headService.Spec.Ports { + headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port} + } + runtimeObjects := append(testPods, headService, testRayCluster) + + // To facilitate testing, we set an impossible value for ObservedGeneration. + // Note that ObjectMeta's `Generation` and `ResourceVersion` don't behave properly in the fake client. + // [Ref] https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.14.5/pkg/client/fake + testRayCluster.Status.ObservedGeneration = -1 + + // Initialize a fake client with newScheme and runtimeObjects. + fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build() + + // Verify the initial values of `Generation` and `ObservedGeneration`. + namespacedName := types.NamespacedName{ + Name: instanceName, + Namespace: namespaceStr, + } + cluster := rayiov1alpha1.RayCluster{} + err = fakeClient.Get(context.Background(), namespacedName, &cluster) + assert.Nil(t, err, "Fail to get RayCluster") + assert.Equal(t, int64(-1), cluster.Status.ObservedGeneration) + assert.Equal(t, int64(0), cluster.ObjectMeta.Generation) + + // Initialize RayCluster reconciler. + testRayClusterReconciler := &RayClusterReconciler{ + Client: fakeClient, + Recorder: &record.FakeRecorder{}, + Scheme: scheme.Scheme, + Log: ctrl.Log.WithName("controllers").WithName("RayCluster"), + } + + // Compare the values of `Generation` and `ObservedGeneration` to check if they match. + err = testRayClusterReconciler.updateStatus(testRayCluster) + assert.Nil(t, err) + err = fakeClient.Get(context.Background(), namespacedName, &cluster) + assert.Nil(t, err) + assert.Equal(t, cluster.ObjectMeta.Generation, cluster.Status.ObservedGeneration) +} diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index b51d8e77a8..a6f1cbc863 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -346,6 +346,9 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1alpha1. rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime) } + // TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not. + rayJob.Status.ObservedGeneration = rayJob.ObjectMeta.Generation + if errStatus := r.Status().Update(ctx, rayJob); errStatus != nil { return fmtErrors.Errorf("combined error: %v %v", err, errStatus) } diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index b73dd40f23..2ec24a9161 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -39,8 +39,6 @@ import ( var _ = Context("Inside the default namespace", func() { ctx := context.TODO() - var workerPods corev1.PodList - myRayJob := &rayiov1alpha1.RayJob{ ObjectMeta: metav1.ObjectMeta{ Name: "rayjob-test", @@ -176,19 +174,6 @@ var _ = Context("Inside the default namespace", func() { }, } - myRayCluster := &rayiov1alpha1.RayCluster{} - - myRayJobWithClusterSelector := &rayiov1alpha1.RayJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "rayjob-test-2", - Namespace: "default", - }, - Spec: rayiov1alpha1.RayJobSpec{ - Entrypoint: "sleep 999", - ClusterSelector: map[string]string{}, - }, - } - Describe("When creating a rayjob", func() { It("should create a rayjob object", func() { err := k8sClient.Create(ctx, myRayJob) @@ -205,7 +190,7 @@ var _ = Context("Inside the default namespace", func() { Eventually( getRayClusterNameForRayJob(ctx, myRayJob), time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName) - + myRayCluster := &rayiov1alpha1.RayCluster{} Eventually( getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster), time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name) @@ -213,6 +198,7 @@ var _ = Context("Inside the default namespace", func() { It("should create more than 1 worker", func() { filterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayJob.Status.RayClusterName, common.RayNodeGroupLabelKey: "small-group"} + workerPods := corev1.PodList{} Eventually( listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}), time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items)) @@ -231,7 +217,16 @@ var _ = Context("Inside the default namespace", func() { Eventually( getRayClusterNameForRayJob(ctx, myRayJob), time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName) - + myRayJobWithClusterSelector := &rayiov1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rayjob-test-2", + Namespace: "default", + }, + Spec: rayiov1alpha1.RayJobSpec{ + Entrypoint: "sleep 999", + ClusterSelector: map[string]string{}, + }, + } myRayJobWithClusterSelector.Spec.ClusterSelector[RayJobDefaultClusterSelectorKey] = myRayJob.Status.RayClusterName err := k8sClient.Create(ctx, myRayJobWithClusterSelector) diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 5214027ac3..6133549625 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -109,6 +109,9 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque } r.cleanUpServeConfigCache(rayServiceInstance) + // TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not. + rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation + logger.Info("Reconciling the cluster component.") // Find active and pending ray cluster objects given current service name. var activeRayClusterInstance *rayv1alpha1.RayCluster diff --git a/ray-operator/controllers/ray/rayservice_controller_test.go b/ray-operator/controllers/ray/rayservice_controller_test.go index f5a8debfac..f328cc0d5b 100644 --- a/ray-operator/controllers/ray/rayservice_controller_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_test.go @@ -45,10 +45,8 @@ var _ = Context("Inside the default namespace", func() { ctx := context.TODO() var workerPods corev1.PodList - var numReplicas int32 - var numCpus float64 - numReplicas = 1 - numCpus = 0.1 + var numReplicas int32 = 1 + var numCpus float64 = 0.1 myRayService := &rayiov1alpha1.RayService{ ObjectMeta: metav1.ObjectMeta{