Skip to content

Commit

Permalink
refactor: Fix flaky tests by using RetryOnConflict (#904)
Browse files Browse the repository at this point in the history
Use RetryOnConflict to relieve flakiness.
  • Loading branch information
Yicheng-Lu-llll authored Feb 14, 2023
1 parent 6490749 commit f058924
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 69 deletions.
68 changes: 19 additions & 49 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,12 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/utils/pointer"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
// +kubebuilder:scaffold:imports
)

const (
DefaultAttempts = 16
DefaultSleepDurationInSeconds = 3
)

var _ = Context("Inside the default namespace", func() {
ctx := context.TODO()
var workerPods corev1.PodList
Expand Down Expand Up @@ -92,9 +87,9 @@ var _ = Context("Inside the default namespace", func() {
},
WorkerGroupSpecs: []rayiov1alpha1.WorkerGroupSpec{
{
Replicas: pointer.Int32Ptr(3),
MinReplicas: pointer.Int32Ptr(0),
MaxReplicas: pointer.Int32Ptr(4),
Replicas: pointer.Int32(3),
MinReplicas: pointer.Int32(0),
MaxReplicas: pointer.Int32(4),
GroupName: "small-group",
RayStartParams: map[string]string{
"port": "6379",
Expand Down Expand Up @@ -229,7 +224,7 @@ var _ = Context("Inside the default namespace", func() {

pod := workerPods.Items[0]
err := k8sClient.Delete(ctx, &pod,
&client.DeleteOptions{GracePeriodSeconds: pointer.Int64Ptr(0)})
&client.DeleteOptions{GracePeriodSeconds: pointer.Int64(0)})

Expect(err).NotTo(HaveOccurred(), "failed delete a pod")

Expand All @@ -241,13 +236,11 @@ var _ = Context("Inside the default namespace", func() {

It("should update a raycluster object deleting a random pod", func() {
// adding a scale down
err := retryOnOldRevision(DefaultAttempts, DefaultSleepDurationInSeconds, func() error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)
rep := new(int32)
*rep = 2
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = rep
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = pointer.Int32(2)

// Operator may update revision after we get cluster earlier. Update may result in 409 conflict error.
// We need to handle conflict error and retry the update.
Expand All @@ -266,17 +259,16 @@ var _ = Context("Inside the default namespace", func() {

It("should update a raycluster object", func() {
// adding a scale strategy
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)

podToDelete1 := workerPods.Items[0]
rep := new(int32)
*rep = 1
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = rep
myRayCluster.Spec.WorkerGroupSpecs[0].ScaleStrategy.WorkersToDelete = []string{podToDelete1.Name}

Expect(k8sClient.Update(ctx, myRayCluster)).Should(Succeed(), "failed to update test RayCluster resource")
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)
podToDelete := workerPods.Items[0]
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = pointer.Int32(1)
myRayCluster.Spec.WorkerGroupSpecs[0].ScaleStrategy.WorkersToDelete = []string{podToDelete.Name}
return k8sClient.Update(ctx, myRayCluster)
})
Expect(err).NotTo(HaveOccurred(), "failed to update test RayCluster resource")
})

It("should have only 1 running worker", func() {
Expand All @@ -288,13 +280,11 @@ var _ = Context("Inside the default namespace", func() {

It("should increase replicas past maxReplicas", func() {
// increasing replicas to 5, which is greater than maxReplicas (4)
err := retryOnOldRevision(DefaultAttempts, DefaultSleepDurationInSeconds, func() error {
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayCluster.Name, Namespace: "default"}, myRayCluster),
time.Second*9, time.Millisecond*500).Should(BeNil(), "My raycluster = %v", myRayCluster)
rep := new(int32)
*rep = 5
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = rep
myRayCluster.Spec.WorkerGroupSpecs[0].Replicas = pointer.Int32(5)

// Operator may update revision after we get cluster earlier. Update may result in 409 conflict error.
// We need to handle conflict error and retry the update.
Expand Down Expand Up @@ -343,26 +333,6 @@ func listResourceFunc(ctx context.Context, workerPods *corev1.PodList, opt ...cl
}
}

func retryOnOldRevision(attempts int, sleep time.Duration, f func() error) error {
var err error
for i := 0; i < attempts; i++ {
if i > 0 {
fmt.Printf("retrying after error: %v", err)
time.Sleep(sleep)
sleep *= 2
}
err = f()
if err == nil {
return nil
}

if !errors.IsConflict(err) {
return nil
}
}
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
}

func getClusterState(ctx context.Context, namespace string, clusterName string) func() rayiov1alpha1.ClusterState {
return func() rayiov1alpha1.ClusterState {
var cluster rayiov1alpha1.RayCluster
Expand Down
8 changes: 4 additions & 4 deletions ray-operator/controllers/ray/rayjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var _ = Context("Inside the default namespace", func() {
RayVersion: "1.12.1",
HeadGroupSpec: rayiov1alpha1.HeadGroupSpec{
ServiceType: corev1.ServiceTypeClusterIP,
Replicas: pointer.Int32Ptr(1),
Replicas: pointer.Int32(1),
RayStartParams: map[string]string{
"port": "6379",
"object-store-memory": "100000000",
Expand Down Expand Up @@ -122,9 +122,9 @@ var _ = Context("Inside the default namespace", func() {
},
WorkerGroupSpecs: []rayiov1alpha1.WorkerGroupSpec{
{
Replicas: pointer.Int32Ptr(3),
MinReplicas: pointer.Int32Ptr(0),
MaxReplicas: pointer.Int32Ptr(10000),
Replicas: pointer.Int32(3),
MinReplicas: pointer.Int32(0),
MaxReplicas: pointer.Int32(10000),
GroupName: "small-group",
RayStartParams: map[string]string{
"port": "6379",
Expand Down
41 changes: 25 additions & 16 deletions ray-operator/controllers/ray/rayservice_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/utils/pointer"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
// +kubebuilder:scaffold:imports
)
Expand Down Expand Up @@ -89,7 +90,7 @@ var _ = Context("Inside the default namespace", func() {
RayVersion: "1.12.1",
HeadGroupSpec: rayiov1alpha1.HeadGroupSpec{
ServiceType: corev1.ServiceTypeClusterIP,
Replicas: pointer.Int32Ptr(1),
Replicas: pointer.Int32(1),
RayStartParams: map[string]string{
"port": "6379",
"object-store-memory": "100000000",
Expand Down Expand Up @@ -167,9 +168,9 @@ var _ = Context("Inside the default namespace", func() {
},
WorkerGroupSpecs: []rayiov1alpha1.WorkerGroupSpec{
{
Replicas: pointer.Int32Ptr(3),
MinReplicas: pointer.Int32Ptr(0),
MaxReplicas: pointer.Int32Ptr(10000),
Replicas: pointer.Int32(3),
MinReplicas: pointer.Int32(0),
MaxReplicas: pointer.Int32(10000),
GroupName: "small-group",
RayStartParams: map[string]string{
"port": "6379",
Expand Down Expand Up @@ -299,17 +300,19 @@ var _ = Context("Inside the default namespace", func() {

It("should update a rayservice object and switch to new Ray Cluster", func() {
// adding a scale strategy
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name)

podToDelete := workerPods.Items[0]
myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas = pointer.Int32(1)
myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].ScaleStrategy.WorkersToDelete = []string{podToDelete.Name}

podToDelete1 := workerPods.Items[0]
rep := new(int32)
*rep = 1
myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas = rep
myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].ScaleStrategy.WorkersToDelete = []string{podToDelete1.Name}
return k8sClient.Update(ctx, myRayService)
})

Expect(k8sClient.Update(ctx, myRayService)).Should(Succeed(), "failed to update test RayService resource")
Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource")

// Confirm switch to a new Ray Cluster.
Eventually(
Expand Down Expand Up @@ -348,10 +351,16 @@ var _ = Context("Inside the default namespace", func() {

// The cluster shouldn't switch until deployments are finished updating
fakeRayDashboardClient.SetServeStatus(generateServeStatus(metav1.NewTime(time.Now().Add(time.Duration(-5)*time.Minute)), "UPDATING"))
myRayService.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env[1].Value = "UPDATED_VALUE"
myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env[1].Value = "UPDATED_VALUE"

Expect(k8sClient.Update(ctx, myRayService)).Should(Succeed(), "failed to update test RayService resource")
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Eventually(
getResourceFunc(ctx, client.ObjectKey{Name: myRayService.Name, Namespace: "default"}, myRayService),
time.Second*3, time.Millisecond*500).Should(BeNil(), "My myRayService = %v", myRayService.Name)
myRayService.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[0].Env[1].Value = "UPDATED_VALUE"
myRayService.Spec.RayClusterSpec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Env[1].Value = "UPDATED_VALUE"
return k8sClient.Update(ctx, myRayService)
})
Expect(err).NotTo(HaveOccurred(), "failed to update test RayService resource")

Eventually(
getPreparingRayClusterNameFunc(ctx, myRayService),
Expand Down

0 comments on commit f058924

Please sign in to comment.