Skip to content

Commit

Permalink
Merge pull request #748 from freehan/readiness-reflector4
Browse files Browse the repository at this point in the history
readiness reflector
  • Loading branch information
k8s-ci-robot authored May 16, 2019
2 parents 8ac76dd + 5de93fd commit 249eae9
Show file tree
Hide file tree
Showing 18 changed files with 1,050 additions and 160 deletions.
73 changes: 41 additions & 32 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/neg/metrics"
"k8s.io/ingress-gce/pkg/neg/readiness"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
Expand All @@ -55,12 +56,10 @@ type Controller struct {
namer negtypes.NetworkEndpointGroupNamer
zoneGetter negtypes.ZoneGetter

ingressSynced cache.InformerSynced
serviceSynced cache.InformerSynced
endpointSynced cache.InformerSynced
ingressLister cache.Indexer
serviceLister cache.Indexer
client kubernetes.Interface
hasSynced func() bool
ingressLister cache.Indexer
serviceLister cache.Indexer
client kubernetes.Interface

// serviceQueue takes service key as work item. Service key with format "namespace/name".
serviceQueue workqueue.RateLimitingInterface
Expand All @@ -69,6 +68,9 @@ type Controller struct {

// syncTracker tracks the latest time that service and endpoint changes are processed
syncTracker utils.TimeTracker

// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector
}

// NewController returns a network endpoint group controller.
Expand All @@ -90,24 +92,25 @@ func NewController(
recorder := eventBroadcaster.NewRecorder(scheme.Scheme,
apiv1.EventSource{Component: "neg-controller"})

manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType)
manager := newSyncerManager(namer, recorder, cloud, zoneGetter, ctx.PodInformer.GetIndexer(), ctx.ServiceInformer.GetIndexer(), ctx.EndpointInformer.GetIndexer(), negSyncerType)
reflector := readiness.NewReadinessReflector(ctx, manager)
manager.reflector = reflector

negController := &Controller{
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
ingressSynced: ctx.IngressInformer.HasSynced,
serviceSynced: ctx.ServiceInformer.HasSynced,
endpointSynced: ctx.EndpointInformer.HasSynced,
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
client: ctx.KubeClient,
manager: manager,
resyncPeriod: resyncPeriod,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
hasSynced: ctx.HasSynced,
ingressLister: ctx.IngressInformer.GetIndexer(),
serviceLister: ctx.ServiceInformer.GetIndexer(),
serviceQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
endpointQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
}

ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -157,14 +160,26 @@ func NewController(
negController.enqueueEndpoint(cur)
},
})

ctx.PodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*apiv1.Pod)
negController.reflector.SyncPod(pod)
},
UpdateFunc: func(old, cur interface{}) {
pod := cur.(*apiv1.Pod)
negController.reflector.SyncPod(pod)
},
})

ctx.AddHealthCheck("neg-controller", negController.IsHealthy)
return negController
}

func (c *Controller) Run(stopCh <-chan struct{}) {
wait.PollUntil(5*time.Second, func() (bool, error) {
klog.V(2).Infof("Waiting for initial sync")
return c.synced(), nil
return c.hasSynced(), nil
}, stopCh)

klog.V(2).Infof("Starting network endpoint group controller")
Expand All @@ -181,7 +196,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
time.Sleep(c.gcPeriod)
wait.Until(c.gc, c.gcPeriod, stopCh)
}()

go c.reflector.Run(stopCh)
<-stopCh
}

Expand Down Expand Up @@ -292,7 +307,7 @@ func (c *Controller) processService(key string) error {
// Only service ports referenced by ingress are synced for NEG
ings := getIngressServicesFromStore(c.ingressLister, service)
ingressSvcPorts := gatherPortMappingUsedByIngress(ings, service)
ingressPortInfoMap := negtypes.NewPortInfoMap(namespace, name, ingressSvcPorts, c.namer)
ingressPortInfoMap := negtypes.NewPortInfoMap(namespace, name, ingressSvcPorts, c.namer, true)
if err := portInfoMap.Merge(ingressPortInfoMap); err != nil {
return fmt.Errorf("failed to merge service ports referenced by ingress (%v): %v", ingressPortInfoMap, err)
}
Expand All @@ -310,7 +325,7 @@ func (c *Controller) processService(key string) error {
return err
}

if err := portInfoMap.Merge(negtypes.NewPortInfoMap(namespace, name, exposedNegSvcPort, c.namer)); err != nil {
if err := portInfoMap.Merge(negtypes.NewPortInfoMap(namespace, name, exposedNegSvcPort, c.namer, false)); err != nil {
return fmt.Errorf("failed to merge service ports exposed as standalone NEGs (%v) into ingress referenced service ports (%v): %v", exposedNegSvcPort, portInfoMap, err)
}
}
Expand Down Expand Up @@ -409,12 +424,6 @@ func (c *Controller) gc() {
}
}

func (c *Controller) synced() bool {
return c.endpointSynced() &&
c.serviceSynced() &&
c.ingressSynced()
}

// gatherPortMappingUsedByIngress returns a map containing port:targetport
// of all service ports of the service that are referenced by ingresses
func gatherPortMappingUsedByIngress(ings []extensions.Ingress, svc *apiv1.Service) negtypes.SvcPortMap {
Expand Down
13 changes: 7 additions & 6 deletions pkg/neg/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func TestGatherPortMappingUsedByIngress(t *testing.T) {
}
}

// TODO(freehan): include test cases with different ReadinessGate setup
func TestSyncNegAnnotation(t *testing.T) {
t.Parallel()
// TODO: test that c.serviceLister.Update is called whenever the annotation
Expand All @@ -375,21 +376,21 @@ func TestSyncNegAnnotation(t *testing.T) {
}{
{
desc: "apply new annotation with no previous annotation",
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 443: "other_port"}, namer),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 443: "other_port"}, namer, false),
},
{
desc: "same annotation applied twice",
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
},
{
desc: "apply new annotation and override previous annotation",
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{3000: "6000", 4000: "8000"}, namer),
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
portMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{3000: "6000", 4000: "8000"}, namer, false),
},
{
desc: "remove previous annotation",
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer),
previousPortMap: negtypes.NewPortInfoMap(namespace, name, negtypes.SvcPortMap{80: "named_port", 4040: "other_port"}, namer, false),
},
{
desc: "remove annotation with no previous annotation",
Expand Down
77 changes: 70 additions & 7 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"fmt"
"sync"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/readiness"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/klog"
Expand All @@ -34,6 +37,10 @@ type serviceKey struct {
name string
}

func (k serviceKey) Key() string {
return fmt.Sprintf("%s/%s", k.namespace, k.name)
}

// syncerManager contains all the active syncer goroutines and manage their lifecycle.
type syncerManager struct {
negSyncerType NegSyncerType
Expand All @@ -43,6 +50,7 @@ type syncerManager struct {
cloud negtypes.NetworkEndpointGroupCloud
zoneGetter negtypes.ZoneGetter

podLister cache.Indexer
serviceLister cache.Indexer
endpointLister cache.Indexer

Expand All @@ -54,21 +62,24 @@ type syncerManager struct {
svcPortMap map[serviceKey]negtypes.PortInfoMap
// syncerMap stores the NEG syncer
// key consists of service namespace, name and targetPort. Value is the corresponding syncer.
syncerMap map[negsyncer.NegSyncerKey]negtypes.NegSyncer
syncerMap map[negtypes.NegSyncerKey]negtypes.NegSyncer
// reflector handles NEG readiness gate and conditions for pods in NEG.
reflector readiness.Reflector
}

func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager {
func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, podLister cache.Indexer, serviceLister cache.Indexer, endpointLister cache.Indexer, negSyncerType NegSyncerType) *syncerManager {
klog.V(2).Infof("NEG controller will use NEG syncer type: %q", negSyncerType)
return &syncerManager{
negSyncerType: negSyncerType,
namer: namer,
recorder: recorder,
cloud: cloud,
zoneGetter: zoneGetter,
podLister: podLister,
serviceLister: serviceLister,
endpointLister: endpointLister,
svcPortMap: make(map[serviceKey]negtypes.PortInfoMap),
syncerMap: make(map[negsyncer.NegSyncerKey]negtypes.NegSyncer),
syncerMap: make(map[negtypes.NegSyncerKey]negtypes.NegSyncer),
}
}

Expand All @@ -82,6 +93,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
currentPorts = make(negtypes.PortInfoMap)
}

// TODO(freehan): change ignore ReadinessGate bool changes
// If the service port has NEG enabled, due to configuration changes,
// readinessGate may be turn on or off for the same service port,
// The current logic will result in syncer being recreated simply because readiness gate setting changed.
removes := currentPorts.Difference(newPorts)
adds := newPorts.Difference(currentPorts)

Expand All @@ -100,7 +115,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
for svcPort, portInfo := range adds {
syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, portInfo.TargetPort)]
if !ok {
syncerKey := negsyncer.NegSyncerKey{
syncerKey := negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: svcPort,
Expand All @@ -114,8 +129,10 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
manager.recorder,
manager.cloud,
manager.zoneGetter,
manager.podLister,
manager.serviceLister,
manager.endpointLister,
manager.reflector,
)
} else {
// Use batch syncer by default
Expand All @@ -139,7 +156,6 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg
}
}
}

return utilerrors.NewAggregate(errList)
}

Expand Down Expand Up @@ -198,6 +214,53 @@ func (manager *syncerManager) GC() error {
return nil
}

// ReadinessGateEnabledNegs returns a list of NEGs which has readiness gate enabled for the input pod's namespace and labels.
func (manager *syncerManager) ReadinessGateEnabledNegs(namespace string, podLabels map[string]string) []string {
manager.mu.Lock()
defer manager.mu.Unlock()
ret := sets.NewString()
for svcKey, portMap := range manager.svcPortMap {
if svcKey.namespace != namespace {
continue
}

obj, exists, err := manager.serviceLister.GetByKey(svcKey.Key())
if err != nil {
klog.Errorf("Failed to retrieve service %s from store: %v", svcKey.Key(), err)
continue
}

if !exists {
continue
}

service := obj.(*v1.Service)

if service.Spec.Selector == nil {
// services with nil selectors match nothing, not everything.
continue
}

selector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
if selector.Matches(labels.Set(podLabels)) {
ret = ret.Union(portMap.NegsWithReadinessGate())
}
}
return ret.List()
}

// ReadinessGateEnabled returns true if the NEG requires readiness feedback
func (manager *syncerManager) ReadinessGateEnabled(syncerKey negtypes.NegSyncerKey) bool {
manager.mu.Lock()
defer manager.mu.Unlock()
if v, ok := manager.svcPortMap[serviceKey{namespace: syncerKey.Namespace, name: syncerKey.Name}]; ok {
if info, ok := v[syncerKey.Port]; ok {
return info.ReadinessGate
}
}
return false
}

// garbageCollectSyncer removes stopped syncer from syncerMap
func (manager *syncerManager) garbageCollectSyncer() {
manager.mu.Lock()
Expand Down Expand Up @@ -262,8 +325,8 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string
}

// getSyncerKey encodes a service namespace, name, service port and targetPort into a string key
func getSyncerKey(namespace, name string, port int32, targetPort string) negsyncer.NegSyncerKey {
return negsyncer.NegSyncerKey{
func getSyncerKey(namespace, name string, port int32, targetPort string) negtypes.NegSyncerKey {
return negtypes.NegSyncerKey{
Namespace: namespace,
Name: name,
Port: port,
Expand Down
Loading

0 comments on commit 249eae9

Please sign in to comment.