Skip to content

Commit

Permalink
[Misc] Operator: Typed Workqueue used
Browse files Browse the repository at this point in the history
Replace:
RateLimitingInterface --> TypedRateLimitingInterface
DefaultControllerRateLimiter --> DefaultTypedControllerRateLimiter
NewRateLimitingQueue --> NewTypedRateLimitingQueueWithConfig

Replaced deprecated StartStructuredLogging with StartLogging.
  • Loading branch information
Pavan-SAP committed Sep 30, 2024
1 parent 7162de2 commit 9e8a795
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 27 deletions.
33 changes: 14 additions & 19 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,19 @@ type Controller struct {
gardenerCertInformerFactory gardenerCertInformers.SharedInformerFactory
certManagerInformerFactory certManagerInformers.SharedInformerFactory
gardenerDNSInformerFactory gardenerDNSInformers.SharedInformerFactory
queues map[int]workqueue.RateLimitingInterface
queues map[int]workqueue.TypedRateLimitingInterface[QueueItem]
eventBroadcaster events.EventBroadcaster
eventRecorder events.EventRecorder
}

func NewController(client kubernetes.Interface, crdClient versioned.Interface, istioClient istio.Interface, gardenerCertificateClient gardenerCert.Interface, certManagerCertificateClient certManager.Interface, gardenerDNSClient gardenerDNS.Interface, apiExtClient apiext.Interface, promClient promop.Interface) *Controller {
queues := map[int]workqueue.RateLimitingInterface{
ResourceCAPApplication: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceCAPApplicationVersion: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceCAPTenant: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceCAPTenantOperation: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
ResourceOperatorDomains: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),

queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
ResourceCAPApplication: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplication]}),
ResourceCAPApplicationVersion: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPApplicationVersion]}),
ResourceCAPTenant: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenant]}),
ResourceCAPTenantOperation: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceCAPTenantOperation]}),
ResourceOperatorDomains: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{Name: KindMap[ResourceOperatorDomains]}),
}

// Use 30mins as the default Resync interval for kube / proprietary resources
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewController(client kubernetes.Interface, crdClient versioned.Interface, i
v1alpha1scheme.AddToScheme(scheme)
istioscheme.AddToScheme(scheme)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
eventBroadcaster.StartStructuredLogging(klog.Level(1))
eventBroadcaster.StartLogging(klog.Background())
recorder := eventBroadcaster.NewRecorder(scheme, "cap-controller.sme.sap.com")

c := &Controller{
Expand Down Expand Up @@ -222,24 +223,19 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error {

klog.V(2).InfoS("Processing queue item in work queue", "resource", getResourceKindFromKey(key), "queue length", q.Len())

i, shutdown := q.Get()
item, shutdown := q.Get()
if shutdown {
return fmt.Errorf("queue (%d) shutdown", key) // stop processing when the queue has been shutdown
}

// [IMPORTANT] always mark the item as done (after processing it)
defer q.Done(i)
defer q.Done(item)

var (
err error
skipItem bool
result *ReconcileResult
)
item, ok := i.(QueueItem)
if !ok {
klog.ErrorS(err, "unknown item found in queue", "resource", getResourceKindFromKey(key))
return nil // process next item
}

attempts := q.NumRequeues(item)

Expand Down Expand Up @@ -268,14 +264,14 @@ func (c *Controller) processQueueItem(ctx context.Context, key int) error {
klog.ErrorS(err, "queue processing error", "resource", getResourceKindFromKey(key))
if !skipItem {
// add back to queue for re-processing
q.AddRateLimited(i)
q.AddRateLimited(item)
return nil
}
}

// Forget the item after processing it
// This just clears the rate limiter from tracking the item
q.Forget(i)
q.Forget(item)

if result != nil {
// requeue resources specified in the reconciliation result
Expand All @@ -300,7 +296,7 @@ func (c *Controller) processReconcileResult(result *ReconcileResult) {
}
}

func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q workqueue.RateLimitingInterface) {
func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q workqueue.TypedRateLimitingInterface[QueueItem]) {
if r := recover(); r != nil {
// Log the Error / Stack Trace
err := fmt.Errorf("panic: %v", r)
Expand All @@ -320,5 +316,4 @@ func (c *Controller) recoverFromPanic(ctx context.Context, item QueueItem, q wor
// Add the item back to the queue to be processed again with a RateLimited delay
q.AddRateLimited(item)
}

}
6 changes: 2 additions & 4 deletions internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestController_processQueueItem(t *testing.T) {

c := getTestController(testResources{cas: []*v1alpha1.CAPApplication{ca}, cats: []*v1alpha1.CAPTenant{cat}, preventStart: true})
if tt.resource == 9 || tt.resource == 99 {
c.queues[tt.resource] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
c.queues[tt.resource] = workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[QueueItem](), workqueue.TypedRateLimitingQueueConfig[QueueItem]{})
}

dummyKubeInformerFactory := &dummyInformerFactoryType{c.kubeInformerFactory, tt.resourceNamespace, nil}
Expand Down Expand Up @@ -263,10 +263,8 @@ func TestController_processQueueItem(t *testing.T) {
cancel()
expectedRes = testC.processQueueItem(ctx, tt.resource)
} else {
if tt.resource < 4 || tt.resource == 9 {
if tt.resource < 4 || tt.resource == 9 || tt.resource == 99 {
q.Add(item)
} else if tt.resource == 99 {
q.Add(tt.resource)
}
expectedRes = testC.processQueueItem(context.TODO(), tt.resource)
}
Expand Down
8 changes: 4 additions & 4 deletions internal/controller/informers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
var expectedResult = false

type dummyType struct {
workqueue.RateLimitingInterface
workqueue.TypedRateLimitingInterface[QueueItem]
}

func (q *dummyType) Add(item interface{}) {
func (q *dummyType) Add(item QueueItem) {
expectedResult = true
}

func (q *dummyType) AddAfter(item interface{}, duration time.Duration) {
func (q *dummyType) AddAfter(item QueueItem, duration time.Duration) {
expectedResult = true
}

Expand Down Expand Up @@ -87,7 +87,7 @@ func TestController_initializeInformers(t *testing.T) {
c := getTestController(testResources{})
expectedResult = false

queues := map[int]workqueue.RateLimitingInterface{
queues := map[int]workqueue.TypedRateLimitingInterface[QueueItem]{
ResourceCAPApplication: &dummyType{},
ResourceCAPApplicationVersion: &dummyType{},
ResourceCAPTenant: &dummyType{},
Expand Down

0 comments on commit 9e8a795

Please sign in to comment.