From c9a41ed01c0484a41751706d02bf757fe19c41f0 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Tue, 18 Sep 2018 14:24:45 -0700 Subject: [PATCH] add a work queue to process endpoint changes --- pkg/neg/controller.go | 41 ++++++++++++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 822fe0bf4d..dc680372d2 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -64,6 +64,8 @@ type Controller struct { // serviceQueue takes service key as work item. Service key with format "namespace/name". serviceQueue workqueue.RateLimitingInterface + // endpointQueue takes endpoint key as work item. Endpoint key with format "namespace/name". + endpointQueue workqueue.RateLimitingInterface // syncTracker tracks the latest time that service and endpoint changes are processed syncTracker utils.TimeTracker @@ -103,6 +105,7 @@ func NewController( ingressLister: ctx.IngressInformer.GetIndexer(), serviceLister: ctx.ServiceInformer.GetIndexer(), serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), syncTracker: utils.NewTimeTracker(), } @@ -147,10 +150,10 @@ func NewController( }) ctx.EndpointInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: negController.processEndpoint, - DeleteFunc: negController.processEndpoint, + AddFunc: negController.enqueueEndpoint, + DeleteFunc: negController.enqueueEndpoint, UpdateFunc: func(old, cur interface{}) { - negController.processEndpoint(cur) + negController.enqueueEndpoint(cur) }, }) ctx.AddHealthCheck("neg-controller", negController.IsHealthy) @@ -170,6 +173,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { }() go wait.Until(c.serviceWorker, time.Second, stopCh) + go wait.Until(c.endpointWorker, time.Second, stopCh) go func() { // Wait for gcPeriod to run the first GC // This is to make sure that all services are fully processed before running GC. @@ -195,23 +199,33 @@ func (c *Controller) IsHealthy() error { func (c *Controller) stop() { glog.V(2).Infof("Shutting down network endpoint group controller") c.serviceQueue.ShutDown() + c.endpointQueue.ShutDown() c.manager.ShutDown() } +func (c *Controller) endpointWorker() { + for { + func() { + key, quit := c.endpointQueue.Get() + if quit { + return + } + c.processEndpoint(key.(string)) + c.endpointQueue.Done(key) + }() + } +} + // processEndpoint finds the related syncers and signal it to sync -func (c *Controller) processEndpoint(obj interface{}) { +func (c *Controller) processEndpoint(key string) { defer func() { now := c.syncTracker.Track() lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) }() - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - glog.Errorf("Failed to generate endpoint key: %v", err) - return - } namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { + glog.Errorf("Failed to split endpoint namespaced key %q: %v", key, err) return } c.manager.Sync(namespace, name) @@ -361,6 +375,15 @@ func (c *Controller) handleErr(err error, key interface{}) { c.serviceQueue.AddRateLimited(key) } +func (c *Controller) enqueueEndpoint(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + glog.Errorf("Failed to generate endpoint key: %v", err) + return + } + c.endpointQueue.Add(key) +} + func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil {