Skip to content

Commit

Permalink
[Feature][Hotfix] Add observedGeneration to the status of CRDs (#979)
Browse files Browse the repository at this point in the history
Add observedGeneration to the status of CRDs
  • Loading branch information
kevin85421 authored Mar 24, 2023
1 parent a0ee1c8 commit 5ca90b3
Show file tree
Hide file tree
Showing 15 changed files with 155 additions and 42 deletions.
5 changes: 5 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11277,6 +11277,11 @@ spec:
each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
10 changes: 10 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11772,6 +11772,11 @@ spec:
type: string
message:
type: string
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayJob.
format: int64
type: integer
rayClusterName:
type: string
rayClusterStatus:
Expand Down Expand Up @@ -11816,6 +11821,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
15 changes: 15 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11872,6 +11872,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down Expand Up @@ -11905,6 +11910,11 @@ spec:
type: object
type: array
type: object
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayService.
format: int64
type: integer
pendingServiceStatus:
description: Pending Service Status indicates a RayCluster will be
created or is being created.
Expand Down Expand Up @@ -11983,6 +11993,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/raycluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ type RayClusterStatus struct {
Head HeadInfo `json:"head,omitempty"`
// Reason provides more information about current State
Reason string `json:"reason,omitempty"`
// observedGeneration is the most recent generation observed for this RayCluster. It corresponds to the
// RayCluster's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// HeadInfo gives info about head
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type RayJobStatus struct {
// Represents time when the job was ended.
EndTime *metav1.Time `json:"endTime,omitempty"`
RayClusterStatus RayClusterStatus `json:"rayClusterStatus,omitempty"`
// observedGeneration is the most recent generation observed for this RayJob. It corresponds to the
// RayJob's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

//+kubebuilder:object:root=true
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1alpha1/rayservice_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ type RayServiceStatuses struct {
PendingServiceStatus RayServiceStatus `json:"pendingServiceStatus,omitempty"`
// ServiceStatus indicates the current RayService status.
ServiceStatus ServiceStatus `json:"serviceStatus,omitempty"`
// observedGeneration is the most recent generation observed for this RayService. It corresponds to the
// RayService's generation, which is updated on mutation by the API Server.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

type RayServiceStatus struct {
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11277,6 +11277,11 @@ spec:
each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
10 changes: 10 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11772,6 +11772,11 @@ spec:
type: string
message:
type: string
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayJob.
format: int64
type: integer
rayClusterName:
type: string
rayClusterStatus:
Expand Down Expand Up @@ -11816,6 +11821,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current State
type: string
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayservices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11872,6 +11872,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down Expand Up @@ -11905,6 +11910,11 @@ spec:
type: object
type: array
type: object
observedGeneration:
description: observedGeneration is the most recent generation observed
for this RayService.
format: int64
type: integer
pendingServiceStatus:
description: Pending Service Status indicates a RayCluster will be
created or is being created.
Expand Down Expand Up @@ -11983,6 +11993,11 @@ spec:
of each node group.
format: int32
type: integer
observedGeneration:
description: observedGeneration is the most recent generation
observed for this RayCluster.
format: int64
type: integer
reason:
description: Reason provides more information about current
State
Expand Down
29 changes: 8 additions & 21 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand All @@ -25,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -828,31 +827,19 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
}

func (r *RayClusterReconciler) updateStatus(instance *rayiov1alpha1.RayCluster) error {
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
instance.Status.ObservedGeneration = instance.ObjectMeta.Generation

runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: instance.Name}
if err := r.List(context.TODO(), &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}

count := utils.CalculateAvailableReplicas(runtimePods)
if instance.Status.AvailableWorkerReplicas != count {
instance.Status.AvailableWorkerReplicas = count
}

count = utils.CalculateDesiredReplicas(instance)
if instance.Status.DesiredWorkerReplicas != count {
instance.Status.DesiredWorkerReplicas = count
}

count = utils.CalculateMinReplicas(instance)
if instance.Status.MinWorkerReplicas != count {
instance.Status.MinWorkerReplicas = count
}

count = utils.CalculateMaxReplicas(instance)
if instance.Status.MaxWorkerReplicas != count {
instance.Status.MaxWorkerReplicas = count
}
instance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
instance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(instance)
instance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(instance)
instance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(instance)

// validation for the RayStartParam for the state.
isValid, err := common.ValidateHeadRayStartParams(instance.Spec.HeadGroupSpec)
Expand Down
55 changes: 55 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,3 +966,58 @@ func TestGetHeadServiceIP(t *testing.T) {
})
}
}

func TestUpdateStatusObservedGeneration(t *testing.T) {
setupTest(t)
defer tearDown(t)

// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayiov1alpha1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// To update the status of RayCluster with `r.Status().Update()`,
// initialize the runtimeObjects with appropriate context. In KubeRay, the `ClusterIP`
// and `TargetPort` fields are typically set by the cluster's control plane.
headService, err := common.BuildServiceForHeadPod(*testRayCluster, nil, nil)
assert.Nil(t, err, "Failed to build head service.")
headService.Spec.ClusterIP = headNodeIP
for i, port := range headService.Spec.Ports {
headService.Spec.Ports[i].TargetPort = intstr.IntOrString{IntVal: port.Port}
}
runtimeObjects := append(testPods, headService, testRayCluster)

// To facilitate testing, we set an impossible value for ObservedGeneration.
// Note that ObjectMeta's `Generation` and `ResourceVersion` don't behave properly in the fake client.
// [Ref] https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/client/fake
testRayCluster.Status.ObservedGeneration = -1

// Initialize a fake client with newScheme and runtimeObjects.
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()

// Verify the initial values of `Generation` and `ObservedGeneration`.
namespacedName := types.NamespacedName{
Name: instanceName,
Namespace: namespaceStr,
}
cluster := rayiov1alpha1.RayCluster{}
err = fakeClient.Get(context.Background(), namespacedName, &cluster)
assert.Nil(t, err, "Fail to get RayCluster")
assert.Equal(t, int64(-1), cluster.Status.ObservedGeneration)
assert.Equal(t, int64(0), cluster.ObjectMeta.Generation)

// Initialize RayCluster reconciler.
testRayClusterReconciler := &RayClusterReconciler{
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
Log: ctrl.Log.WithName("controllers").WithName("RayCluster"),
}

// Compare the values of `Generation` and `ObservedGeneration` to check if they match.
err = testRayClusterReconciler.updateStatus(testRayCluster)
assert.Nil(t, err)
err = fakeClient.Get(context.Background(), namespacedName, &cluster)
assert.Nil(t, err)
assert.Equal(t, cluster.ObjectMeta.Generation, cluster.Status.ObservedGeneration)
}
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,9 @@ func (r *RayJobReconciler) updateState(ctx context.Context, rayJob *rayv1alpha1.
rayJob.Status.EndTime = utils.ConvertUnixTimeToMetav1Time(jobInfo.EndTime)
}

// TODO (kevin85421): ObservedGeneration should be used to determine whether update this CR or not.
rayJob.Status.ObservedGeneration = rayJob.ObjectMeta.Generation

if errStatus := r.Status().Update(ctx, rayJob); errStatus != nil {
return fmtErrors.Errorf("combined error: %v %v", err, errStatus)
}
Expand Down
29 changes: 12 additions & 17 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (

var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList

myRayJob := &rayiov1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test",
Expand Down Expand Up @@ -176,19 +174,6 @@ var _ = Context("Inside the default namespace", func() {
},
}

myRayCluster := &rayiov1alpha1.RayCluster{}

myRayJobWithClusterSelector := &rayiov1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test-2",
Namespace: "default",
},
Spec: rayiov1alpha1.RayJobSpec{
Entrypoint: "sleep 999",
ClusterSelector: map[string]string{},
},
}

Describe("When creating a rayjob", func() {
It("should create a rayjob object", func() {
err := k8sClient.Create(ctx, myRayJob)
Expand All @@ -205,14 +190,15 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getRayClusterNameForRayJob(ctx, myRayJob),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)

myRayCluster := &rayiov1alpha1.RayCluster{}
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayJob.Status.RayClusterName, Namespace: "default"}, myRayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayCluster = %v", myRayCluster.Name)
})

It("should create more than 1 worker", func() {
filterLabels := client.MatchingLabels{common.RayClusterLabelKey: myRayJob.Status.RayClusterName, common.RayNodeGroupLabelKey: "small-group"}
workerPods := corev1.PodList{}
Eventually(
listResourceFunc(ctx, &workerPods, filterLabels, &client.ListOptions{Namespace: "default"}),
time.Second*15, time.Millisecond*500).Should(Equal(3), fmt.Sprintf("workerGroup %v", workerPods.Items))
Expand All @@ -231,7 +217,16 @@ var _ = Context("Inside the default namespace", func() {
Eventually(
getRayClusterNameForRayJob(ctx, myRayJob),
time.Second*15, time.Millisecond*500).Should(Not(BeEmpty()), "My RayCluster name = %v", myRayJob.Status.RayClusterName)

myRayJobWithClusterSelector := &rayiov1alpha1.RayJob{
ObjectMeta: metav1.ObjectMeta{
Name: "rayjob-test-2",
Namespace: "default",
},
Spec: rayiov1alpha1.RayJobSpec{
Entrypoint: "sleep 999",
ClusterSelector: map[string]string{},
},
}
myRayJobWithClusterSelector.Spec.ClusterSelector[RayJobDefaultClusterSelectorKey] = myRayJob.Status.RayClusterName

err := k8sClient.Create(ctx, myRayJobWithClusterSelector)
Expand Down
3 changes: 3 additions & 0 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque
}
r.cleanUpServeConfigCache(rayServiceInstance)

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
rayServiceInstance.Status.ObservedGeneration = rayServiceInstance.ObjectMeta.Generation

logger.Info("Reconciling the cluster component.")
// Find active and pending ray cluster objects given current service name.
var activeRayClusterInstance *rayv1alpha1.RayCluster
Expand Down
6 changes: 2 additions & 4 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,8 @@ var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList

var numReplicas int32
var numCpus float64
numReplicas = 1
numCpus = 0.1
var numReplicas int32 = 1
var numCpus float64 = 0.1

myRayService := &rayiov1alpha1.RayService{
ObjectMeta: metav1.ObjectMeta{
Expand Down

0 comments on commit 5ca90b3

Please sign in to comment.