Skip to content

Commit

Permalink
Merge pull request #482 from freehan/split-neg-sync
Browse files Browse the repository at this point in the history
add work queue to process endpoint changes
  • Loading branch information
k8s-ci-robot authored Oct 14, 2018
2 parents d6a9498 + c9a41ed commit 24a5ada
Showing 1 changed file with 32 additions and 9 deletions.
41 changes: 32 additions & 9 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Controller struct {

// serviceQueue takes service key as work item. Service key with format "namespace/name".
serviceQueue workqueue.RateLimitingInterface
// endpointQueue takes endpoint key as work item. Endpoint key with format "namespace/name".
endpointQueue workqueue.RateLimitingInterface

// syncTracker tracks the latest time that service and endpoint changes are processed
syncTracker utils.TimeTracker
Expand Down Expand Up @@ -104,6 +106,7 @@ func NewController(
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
}

Expand Down Expand Up @@ -148,10 +151,10 @@ func NewController(
})

ctx.EndpointInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: negController.processEndpoint,
DeleteFunc: negController.processEndpoint,
AddFunc: negController.enqueueEndpoint,
DeleteFunc: negController.enqueueEndpoint,
UpdateFunc: func(old, cur interface{}) {
negController.processEndpoint(cur)
negController.enqueueEndpoint(cur)
},
})
ctx.AddHealthCheck("neg-controller", negController.IsHealthy)
Expand All @@ -171,6 +174,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}()

go wait.Until(c.serviceWorker, time.Second, stopCh)
go wait.Until(c.endpointWorker, time.Second, stopCh)
go func() {
// Wait for gcPeriod to run the first GC
// This is to make sure that all services are fully processed before running GC.
Expand All @@ -196,23 +200,33 @@ func (c *Controller) IsHealthy() error {
func (c *Controller) stop() {
glog.V(2).Infof("Shutting down network endpoint group controller")
c.serviceQueue.ShutDown()
c.endpointQueue.ShutDown()
c.manager.ShutDown()
}

func (c *Controller) endpointWorker() {
for {
func() {
key, quit := c.endpointQueue.Get()
if quit {
return
}
c.processEndpoint(key.(string))
c.endpointQueue.Done(key)
}()
}
}

// processEndpoint finds the related syncers and signal it to sync
func (c *Controller) processEndpoint(obj interface{}) {
func (c *Controller) processEndpoint(key string) {
defer func() {
now := c.syncTracker.Track()
metrics.LastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano()))
}()

key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Failed to generate endpoint key: %v", err)
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Failed to split endpoint namespaced key %q: %v", key, err)
return
}
c.manager.Sync(namespace, name)
Expand Down Expand Up @@ -362,6 +376,15 @@ func (c *Controller) handleErr(err error, key interface{}) {
c.serviceQueue.AddRateLimited(key)
}

func (c *Controller) enqueueEndpoint(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
glog.Errorf("Failed to generate endpoint key: %v", err)
return
}
c.endpointQueue.Add(key)
}

func (c *Controller) enqueueService(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
Expand Down

0 comments on commit 24a5ada

Please sign in to comment.