diff --git a/pkg/controllers/routing/ecmp_vip.go b/pkg/controllers/routing/ecmp_vip.go index 408286d7c3..b2b41e1752 100644 --- a/pkg/controllers/routing/ecmp_vip.go +++ b/pkg/controllers/routing/ecmp_vip.go @@ -13,8 +13,6 @@ import ( "github.com/cloudnativelabs/kube-router/v2/pkg/metrics" "github.com/cloudnativelabs/kube-router/v2/pkg/utils" - "strings" - gobgpapi "github.com/osrg/gobgp/v3/api" v1core "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" @@ -118,7 +116,7 @@ func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEven nrc.OnServiceCreate(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - nrc.OnServiceUpdate(newObj, oldObj) + nrc.OnServiceUpdate(oldObj, newObj) }, DeleteFunc: func(obj interface{}) { nrc.OnServiceDelete(obj) @@ -127,20 +125,23 @@ func (nrc *NetworkRoutingController) newServiceEventHandler() cache.ResourceEven } func getServiceObject(obj interface{}) (svc *v1core.Service) { + if obj == nil { + return + } if svc, _ = obj.(*v1core.Service); svc == nil { klog.Errorf("cache indexer returned obj that is not type *v1.Service") } return } -func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) { +func (nrc *NetworkRoutingController) handleServiceUpdate(svcOld, svcNew *v1core.Service) { if !nrc.bgpServerStarted { klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", - svc.Namespace, svc.Name) + svcNew.Namespace, svcNew.Name) return } - toAdvertise, toWithdraw, err := nrc.getActiveVIPs() + toAdvertise, toWithdraw, err := nrc.getChangedVIPs(svcOld, svcNew, true) if err != nil { klog.Errorf("error getting routes for services: %s", err) return @@ -156,11 +157,11 @@ func (nrc *NetworkRoutingController) handleServiceUpdate(svc *v1core.Service) { nrc.withdrawVIPs(toWithdraw) } -func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) { +func (nrc *NetworkRoutingController) handleServiceDelete(oldSvc *v1core.Service) { if !nrc.bgpServerStarted { klog.V(3).Infof("Skipping update to service: %s/%s, controller still performing bootup full-sync", - svc.Namespace, svc.Name) + oldSvc.Namespace, oldSvc.Name) return } @@ -174,122 +175,78 @@ func (nrc *NetworkRoutingController) handleServiceDelete(svc *v1core.Service) { klog.Errorf("Failed to get active VIP's on service delete event due to: %s", err.Error()) return } - activeVIPsMap := make(map[string]bool) - for _, activeVIP := range activeVIPs { - activeVIPsMap[activeVIP] = true - } - serviceVIPs := nrc.getAllVIPsForService(svc) + serviceVIPs := nrc.getAllVIPsForService(oldSvc) withdrawVIPs := make([]string, 0) for _, serviceVIP := range serviceVIPs { // withdraw VIP only if deleted service is the last service using the VIP - if !activeVIPsMap[serviceVIP] { + if !utils.SliceContainsString(serviceVIP, activeVIPs) { withdrawVIPs = append(withdrawVIPs, serviceVIP) } } nrc.withdrawVIPs(withdrawVIPs) - } -func (nrc *NetworkRoutingController) tryHandleServiceUpdate(obj interface{}, logMsgFormat string) { - if svc := getServiceObject(obj); svc != nil { - klog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name) +func (nrc *NetworkRoutingController) tryHandleServiceUpdate(objOld, objNew interface{}) { + svcOld := getServiceObject(objOld) + svcNew := getServiceObject(objNew) - // If the service is headless and the previous version of the service is either non-existent or also headless, - // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we - // don't need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, - // it was a ClusterIP before. - if utils.ServiceIsHeadless(obj) { - klog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name) - return - } + // We expect at least svcNew to be non-nil in order to process this service update, if not get out quick + if svcNew == nil { + klog.Warningf("received a nil service objects, aborting as we can't continue") + return + } + + klog.V(1).Infof("attempting to update service %s:%s", svcNew.Namespace, svcNew.Name) - nrc.handleServiceUpdate(svc) + // If the service is headless and the previous version of the service is either non-existent or also headless, + // skip processing as we only work with VIPs in the next section. Since the ClusterIP field is immutable we + // don't need to consider previous versions of the service here as we are guaranteed if is a ClusterIP now, + // it was a ClusterIP before. + if utils.ServiceIsHeadless(objNew) { + klog.V(1).Infof("%s/%s is headless, skipping...", svcNew.Namespace, svcNew.Name) + return } + + nrc.handleServiceUpdate(svcOld, svcNew) } -func (nrc *NetworkRoutingController) tryHandleServiceDelete(obj interface{}, logMsgFormat string) { - svc, ok := obj.(*v1core.Service) +func (nrc *NetworkRoutingController) tryHandleServiceDelete(oldObj interface{}, logMsgFormat string) { + oldSvc, ok := oldObj.(*v1core.Service) if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + tombstone, ok := oldObj.(cache.DeletedFinalStateUnknown) if !ok { - klog.Errorf("unexpected object type: %v", obj) + klog.Errorf("unexpected object type: %v", oldObj) return } - if svc, ok = tombstone.Obj.(*v1core.Service); !ok { - klog.Errorf("unexpected object type: %v", obj) + if oldSvc, ok = tombstone.Obj.(*v1core.Service); !ok { + klog.Errorf("unexpected object type: %v", oldObj) return } } - klog.V(1).Infof(logMsgFormat, svc.Namespace, svc.Name) + klog.V(1).Infof(logMsgFormat, oldSvc.Namespace, oldSvc.Name) // If the service is headless skip processing as we only work with VIPs in the next section. - if utils.ServiceIsHeadless(obj) { - klog.V(1).Infof("%s/%s is headless, skipping...", svc.Namespace, svc.Name) + if utils.ServiceIsHeadless(oldObj) { + klog.V(1).Infof("%s/%s is headless, skipping...", oldSvc.Namespace, oldSvc.Name) return } - nrc.handleServiceDelete(svc) + nrc.handleServiceDelete(oldSvc) } // OnServiceCreate handles new service create event from the kubernetes API server func (nrc *NetworkRoutingController) OnServiceCreate(obj interface{}) { - nrc.tryHandleServiceUpdate(obj, "Received new service: %s/%s from watch API") + nrc.tryHandleServiceUpdate(nil, obj) } // OnServiceUpdate handles the service relates updates from the kubernetes API server -func (nrc *NetworkRoutingController) OnServiceUpdate(objNew interface{}, objOld interface{}) { - nrc.tryHandleServiceUpdate(objNew, "Received update on service: %s/%s from watch API") - - // This extra call needs to be here, because during the update the list of externalIPs may have changed and - // externalIPs is the only service VIP field that is: - // a) mutable after first creation - // b) an array - // - // This means that while we only need to withdraw ClusterIP VIPs and LoadBalancer VIPs on delete, we may need - // to withdraw ExternalIPs on update. - // - // As such, it needs to be handled differently as nrc.handleServiceUpdate only withdraws VIPs if the service - // endpoint is no longer scheduled on this node and its a local type service. - nrc.withdrawVIPs(nrc.getExternalIPsToWithdraw(getServiceObject(objOld), getServiceObject(objNew))) -} - -func (nrc *NetworkRoutingController) getExternalIPsToWithdraw(svcOld, svcNew *v1core.Service) (out []string) { - withdrawnServiceVips := make([]string, 0) - if svcOld != nil && svcNew != nil { - withdrawnServiceVips = getMissingPrevGen(nrc.getExternalIPs(svcOld), nrc.getExternalIPs(svcNew)) - } - // ensure external IP to be withdrawn is not used by any other service - allActiveVIPs, _, err := nrc.getActiveVIPs() - if err != nil { - klog.Errorf("failed to get all active VIP's due to: %s", err.Error()) - return - } - activeVIPsMap := make(map[string]bool) - for _, activeVIP := range allActiveVIPs { - activeVIPsMap[activeVIP] = true - } - for _, serviceVIP := range withdrawnServiceVips { - // withdraw VIP only if updated service is the last service using the VIP - if !activeVIPsMap[serviceVIP] { - out = append(out, serviceVIP) - } - } - return -} - -func getMissingPrevGen(old, new []string) (withdrawIPs []string) { - lookIn := " " + strings.Join(new, " ") + " " - for _, s := range old { - if !strings.Contains(lookIn, " "+s+" ") { - withdrawIPs = append(withdrawIPs, s) - } - } - return +func (nrc *NetworkRoutingController) OnServiceUpdate(objOld interface{}, objNew interface{}) { + nrc.tryHandleServiceUpdate(objOld, objNew) } // OnServiceDelete handles the service delete updates from the kubernetes API server -func (nrc *NetworkRoutingController) OnServiceDelete(obj interface{}) { - nrc.tryHandleServiceDelete(obj, "Received event to delete service: %s/%s from watch API") +func (nrc *NetworkRoutingController) OnServiceDelete(oldObj interface{}) { + nrc.tryHandleServiceDelete(oldObj, "Received event to delete service: %s/%s from watch API") } func (nrc *NetworkRoutingController) newEndpointsEventHandler() cache.ResourceEventHandler { @@ -351,7 +308,7 @@ func (nrc *NetworkRoutingController) OnEndpointsUpdate(obj interface{}) { return } - nrc.tryHandleServiceUpdate(svc, "Updating service %s/%s triggered by endpoint update event") + nrc.tryHandleServiceUpdate(nil, svc) } func (nrc *NetworkRoutingController) getClusterIP(svc *v1core.Service) []string { @@ -405,6 +362,67 @@ func (nrc *NetworkRoutingController) getLoadBalancerIPs(svc *v1core.Service) []s return loadBalancerIPList } +func (nrc *NetworkRoutingController) getChangedVIPs(oldSvc, newSvc *v1core.Service, + onlyActiveEndpoints bool) ([]string, []string, error) { + toWithdrawList := make([]string, 0) + advertiseService := false + + _, hasLocalAnnotation := newSvc.Annotations[svcLocalAnnotation] + hasLocalTrafficPolicy := newSvc.Spec.ExternalTrafficPolicy == v1core.ServiceExternalTrafficPolicyTypeLocal + isLocal := hasLocalAnnotation || hasLocalTrafficPolicy + + if onlyActiveEndpoints && isLocal { + var err error + advertiseService, err = nrc.nodeHasEndpointsForService(newSvc) + if err != nil { + return nil, nil, err + } + } + + newServiceVIPs := nrc.getAllVIPsForService(newSvc) + // This function allows oldSvc to be nil, if this is the case, we don't have any old VIPs to compare against and + // possibly withdraw instead treat all VIPs as new and return them as either toAdvertise or toWithdraw depending + // on service configuration + if oldSvc == nil { + if advertiseService { + return newServiceVIPs, nil, nil + } else { + return nil, newServiceVIPs, nil + } + } + oldServiceVIPs := nrc.getAllVIPsForService(oldSvc) + + // If we are instructed to only advertise local services and this service doesn't have endpoints on the node we are + // currently running on, then attempt to withdraw all the VIPs that the old service had. + if !advertiseService { + return nil, oldServiceVIPs, nil + } + + // At this point we're sure that we should be advertising some VIPs, but we need to figure out which VIPs to + // advertise and which, if any to withdraw. + toAdvertiseList := newServiceVIPs + for _, oldServiceVIP := range oldServiceVIPs { + if !utils.SliceContainsString(oldServiceVIP, toAdvertiseList) { + toWithdrawList = append(toWithdrawList, oldServiceVIP) + } + } + + // It is possible that this host may have the same IP advertised from multiple services, and we don't want to + // withdraw it if there is an active service for this VIP on a different service than the one that is changing. + finalToWithdrawList := make([]string, 0) + allVIPsOnServer, _, err := nrc.getVIPs(onlyActiveEndpoints) + if err != nil { + return nil, nil, err + } + for _, withdrawVIP := range toWithdrawList { + if !utils.SliceContainsString(withdrawVIP, allVIPsOnServer) { + finalToWithdrawList = append(finalToWithdrawList, withdrawVIP) + } + } + + return toAdvertiseList, finalToWithdrawList, nil +} + func (nrc *NetworkRoutingController) getAllVIPs() ([]string, []string, error) { return nrc.getVIPs(false) } @@ -436,18 +454,12 @@ func (nrc *NetworkRoutingController) getVIPs(onlyActiveEndpoints bool) ([]string // We need to account for the niche case where multiple services may have the same VIP, in this case, one service // might be ready while the other service is not. We still want to advertise the VIP as long as there is at least - // one active endpoint on the node or we might introduce a service disruption. + // one active endpoint on the node, or we might introduce a service disruption. finalToWithdrawList := make([]string, 0) -OUTER: for _, withdrawVIP := range toWithdrawList { - for _, advertiseVIP := range toAdvertiseList { - if withdrawVIP == advertiseVIP { - // if there is a VIP that is set to both be advertised and withdrawn, don't add it to the final - // withdraw list - continue OUTER - } + if !utils.SliceContainsString(withdrawVIP, toAdvertiseList) { + finalToWithdrawList = append(finalToWithdrawList, withdrawVIP) } - finalToWithdrawList = append(finalToWithdrawList, withdrawVIP) } return toAdvertiseList, finalToWithdrawList, nil diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 37a7d3cce9..afb3126417 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -75,3 +75,14 @@ func ContainsIPv6Address(addrs []string) bool { } return false } + +// SliceContainsString checks to see if needle is contained within haystack, returns true if found, otherwise +// returns false +func SliceContainsString(needle string, haystack []string) bool { + for _, hay := range haystack { + if needle == hay { + return true + } + } + return false +}