Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor neg controller: processService, add unittest #856

Merged
merged 2 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/annotations/destination_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,10 @@ func (ns DestinationRuleNEGStatus) Marshal() (string, error) {
}
return string(bytes), err
}

// ParseDestinationRuleNEGStatus parses the given annotation into DestinationRuleNEGStatus struct
func ParseDestinationRuleNEGStatus(annotation string) (DestinationRuleNEGStatus, error) {
ret := &DestinationRuleNEGStatus{}
err := json.Unmarshal([]byte(annotation), ret)
return *ret, err
}
155 changes: 93 additions & 62 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,71 +325,38 @@ func (c *Controller) processService(key string) error {
if service == nil {
return fmt.Errorf("cannot convert to Service (%T)", obj)
}
negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation()
if err != nil {

svcPortInfoMap := make(negtypes.PortInfoMap)
if err := c.mergeDefaultBackendServicePortInfoMap(key, svcPortInfoMap); err != nil {
return err
}

portInfoMap := make(negtypes.PortInfoMap)
needNeg := false
// process default backend service
// Only enable for L7-ILB for now to limit possible issues
// TODO(shance): investigate enabling this for all ingresses
if flags.F.EnableL7Ilb && c.defaultBackendService.ID.Service.String() == key {
portInfoMap = c.defaultBackendServicePortInfoMap()
needNeg = len(portInfoMap) > 0
} else {
if foundNEGAnnotation && negAnnotation != nil && negAnnotation.NEGEnabled() {
needNeg = true
}
if err := c.mergeIngressPortInfo(service, types.NamespacedName{Namespace: namespace, Name: name}, svcPortInfoMap); err != nil {
return err
}
if err := c.mergeStandaloneNEGsPortInfo(service, types.NamespacedName{Namespace: namespace, Name: name}, svcPortInfoMap); err != nil {
return err
}

csmPortInfoMap := make(negtypes.PortInfoMap)
if c.enableCSM {
needNeg = true
// Find all destination rules that using this service.
destinationRules := getDestinationRulesFromStore(c.destinationRuleLister, service)
// Fill all service ports into portinfomap
servicePorts := gatherPortMappingFromService(service)
for namespacedName, destinationRule := range destinationRules {
destinationRulePortInfoMap, err := negtypes.NewPortInfoMapWithDestinationRule(namespace, name, servicePorts, c.namer, true, destinationRule)
if err != nil {
klog.Warningf("DestinationRule(%s) contains duplicated subset, creating NEGs for the newer ones. %s", namespacedName.Name, err)
}
if err := csmPortInfoMap.Merge(destinationRulePortInfoMap); err != nil {
return fmt.Errorf("failed to merge service ports referenced by Istio:DestinationRule (%v): %v", destinationRulePortInfoMap, err)
}
if err = c.syncDestinationRuleNegStatusAnnotation(namespacedName.Namespace, namespacedName.Name, destinationRulePortInfoMap); err != nil {
return err
}
}
// Create NEGs for every ports of the services.
if service.Spec.Selector == nil || len(service.Spec.Selector) == 0 {
klog.Infof("Skip NEG creation for services that with no selector: %s:%s", namespace, name)
} else if contains(c.csmServiceNEGSkipNamespaces, namespace) {
klog.Infof("Skip NEG creation for services in namespace: %s", namespace)
} else {
needNeg = true
servicePortInfoMap := negtypes.NewPortInfoMap(namespace, name, servicePorts, c.namer, true)
if err := portInfoMap.Merge(servicePortInfoMap); err != nil {
return fmt.Errorf("failed to merge service ports referenced by Istio:DestinationRule (%v): %v", servicePortInfoMap, err)
}
}
csmSVCPortInfoMap, destinationRulesPortInfoMap, err := c.getCSMPortInfoMap(namespace, name, service)
if err != nil {
return err
}
// merges csmSVCPortInfoMap, because eventually those NEG will sync with the service annotation.
// merges destinationRulesPortInfoMap later, because we only want them sync with the DestinationRule annotation.
if err := svcPortInfoMap.Merge(csmSVCPortInfoMap); err != nil {
return fmt.Errorf("failed to merge CSM service PortInfoMap: %v, error: %v", csmSVCPortInfoMap, err)
}

if needNeg {
if len(svcPortInfoMap) != 0 || len(destinationRulesPortInfoMap) != 0 {
klog.V(2).Infof("Syncing service %q", key)
if err := c.mergeIngressPortInfo(negAnnotation, service, types.NamespacedName{Namespace: namespace, Name: name}, &portInfoMap); err != nil {
return err
}
if err = c.syncNegStatusAnnotation(namespace, name, portInfoMap); err != nil {
if err = c.syncNegStatusAnnotation(namespace, name, svcPortInfoMap); err != nil {
return err
}
// Merge destinationRule related NEG after the Service NEGStatus Sync, we don't want DR related NEG status go into service.
if err := portInfoMap.Merge(csmPortInfoMap); err != nil {
return fmt.Errorf("failed to merge service ports referenced by Istio:DestinationRule (%v): %v", csmPortInfoMap, err)
if err := svcPortInfoMap.Merge(destinationRulesPortInfoMap); err != nil {
return fmt.Errorf("failed to merge service ports referenced by Istio:DestinationRule (%v): %v", destinationRulesPortInfoMap, err)
}
cadmuxe marked this conversation as resolved.
Show resolved Hide resolved
return c.manager.EnsureSyncers(namespace, name, portInfoMap)
return c.manager.EnsureSyncers(namespace, name, svcPortInfoMap)
}

// do not need Neg
Expand All @@ -400,7 +367,16 @@ func (c *Controller) processService(key string) error {
return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap))
}

func (c *Controller) mergeIngressPortInfo(negAnnotation *annotations.NegAnnotation, service *apiv1.Service, name types.NamespacedName, portInfoMap *negtypes.PortInfoMap) error {
// mergeIngressPortInfo merges Ingress PortInfo into portInfoMap if the service has Enable Ingress annotation.
func (c *Controller) mergeIngressPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap) error {
negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation()
if err != nil {
return err
}
if !foundNEGAnnotation {
return nil
}

// handle NEGs used by ingress
if negAnnotation != nil && negAnnotation.NEGEnabledForIngress() {
// Only service ports referenced by ingress are synced for NEG
Expand All @@ -411,7 +387,18 @@ func (c *Controller) mergeIngressPortInfo(negAnnotation *annotations.NegAnnotati
return fmt.Errorf("failed to merge service ports referenced by ingress (%v): %v", ingressPortInfoMap, err)
}
}
return nil
}

// mergeStandaloneNEGsPortInfo merge Sandaloon NEG PortInfo into portInfoMap
func (c *Controller) mergeStandaloneNEGsPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap) error {
negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation()
if err != nil {
return err
}
if !foundNEGAnnotation {
return nil
}
// handle Exposed Standalone NEGs
if negAnnotation != nil && negAnnotation.NEGExposed() {
knownPorts := make(negtypes.SvcPortMap)
Expand All @@ -432,20 +419,61 @@ func (c *Controller) mergeIngressPortInfo(negAnnotation *annotations.NegAnnotati
return nil
}

// defaultBackendServicePortInfoMap returns a PortInfoMap for the default backend service
// mergeDefaultBackendServicePortInfoMap merge the PortInfoMap for the default backend service into portInfoMap
// The default backend service needs special handling since it is not explicitly referenced
// in the ingress spec. It is either inferred and then managed by the controller, or
// it is passed to the controller via a command line flag.
// Additionally, supporting NEGs for default backends is only for L7-ILB
func (c *Controller) defaultBackendServicePortInfoMap() negtypes.PortInfoMap {
for _, m := range c.ingressLister.List() {
ing := *m.(*v1beta1.Ingress)
if utils.IsGCEL7ILBIngress(&ing) && ing.Spec.Backend == nil {
return negtypes.NewPortInfoMap(c.defaultBackendService.ID.Service.Namespace, c.defaultBackendService.ID.Service.Name, negtypes.SvcPortMap{80: c.defaultBackendService.TargetPort}, c.namer, false)
func (c *Controller) mergeDefaultBackendServicePortInfoMap(key string, portInfoMap negtypes.PortInfoMap) error {
// process default backend service
// Only enable for L7-ILB for now to limit possible issues
// TODO(shance): investigate enabling this for all ingresses
if flags.F.EnableL7Ilb && c.defaultBackendService.ID.Service.String() == key {
for _, m := range c.ingressLister.List() {
ing := *m.(*v1beta1.Ingress)
if utils.IsGCEL7ILBIngress(&ing) && ing.Spec.Backend == nil {
defaultServicePortInfoMap := negtypes.NewPortInfoMap(c.defaultBackendService.ID.Service.Namespace, c.defaultBackendService.ID.Service.Name, negtypes.SvcPortMap{80: c.defaultBackendService.TargetPort}, c.namer, false)
return portInfoMap.Merge(defaultServicePortInfoMap)
}
}
}
return nil
}

// getCSMPortInfoMap gets the PortInfoMap for service and DestinationRules.
// If enableCSM = true, the controller will create NEGs for every port/subsets combinations for the DestinaitonRules.
// It will also create NEGs for all the ports of the service that referred by the DestinationRules.
func (c *Controller) getCSMPortInfoMap(namespace, name string, service *apiv1.Service) (negtypes.PortInfoMap, negtypes.PortInfoMap, error) {
destinationRulesPortInfoMap := make(negtypes.PortInfoMap)
servicePortInfoMap := make(negtypes.PortInfoMap)
if c.enableCSM {
// Find all destination rules that using this service.
destinationRules := getDestinationRulesFromStore(c.destinationRuleLister, service)
// Fill all service ports into portinfomap
servicePorts := gatherPortMappingFromService(service)
for namespacedName, destinationRule := range destinationRules {
destinationRulePortInfoMap, err := negtypes.NewPortInfoMapWithDestinationRule(namespace, name, servicePorts, c.namer, false, destinationRule)
if err != nil {
klog.Warningf("DestinationRule(%s) contains duplicated subset, creating NEGs for the newer ones. %s", namespacedName.Name, err)
}
if err := destinationRulesPortInfoMap.Merge(destinationRulePortInfoMap); err != nil {
return servicePortInfoMap, destinationRulesPortInfoMap, fmt.Errorf("failed to merge service ports referenced by Istio:DestinationRule (%v): %v", destinationRulePortInfoMap, err)
}
if err = c.syncDestinationRuleNegStatusAnnotation(namespacedName.Namespace, namespacedName.Name, destinationRulePortInfoMap); err != nil {
return servicePortInfoMap, destinationRulesPortInfoMap, err
}
}
// Create NEGs for every ports of the services.
if service.Spec.Selector == nil || len(service.Spec.Selector) == 0 {
klog.Infof("Skip NEG creation for services that with no selector: %s:%s", namespace, name)
} else if contains(c.csmServiceNEGSkipNamespaces, namespace) {
klog.Infof("Skip NEG creation for services in namespace: %s", namespace)
} else {
servicePortInfoMap = negtypes.NewPortInfoMap(namespace, name, servicePorts, c.namer, false)
}
return servicePortInfoMap, destinationRulesPortInfoMap, nil
}
return make(negtypes.PortInfoMap)
return servicePortInfoMap, destinationRulesPortInfoMap, nil
}

// syncNegStatusAnnotation syncs the neg status annotation
Expand Down Expand Up @@ -502,6 +530,9 @@ func (c *Controller) syncDestinationRuleNegStatusAnnotation(namespace, destinati
dsClient := c.destinationRuleClient.Namespace(namespace)
destinationRule, err := dsClient.Get(destinationRuleName, metav1.GetOptions{})
drAnnotations := destinationRule.GetAnnotations()
if drAnnotations == nil {
drAnnotations = make(map[string]string)
}
if len(portmap) == 0 {
delete(drAnnotations, annotations.NEGStatusKey)
klog.V(2).Infof("Removing NEG status annotation from DestinationRule: %s/%s", namespace, destinationRule)
Expand Down
Loading