diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2b98cc322c0..4b63b0f5e82 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -514,14 +514,8 @@ func (p *proxier) OnServiceUpdate(oldService, service *corev1.Service) { } else { metrics.ServicesUpdatesTotal.Inc() } - var isIPv6 bool - if oldService != nil { - isIPv6 = utilnet.IsIPv6String(oldService.Spec.ClusterIP) - } else { - isIPv6 = utilnet.IsIPv6String(service.Spec.ClusterIP) - } - if isIPv6 == p.isIPv6 { - if p.serviceChanges.OnServiceUpdate(oldService, service) && p.isInitialized() { + if p.serviceChanges.OnServiceUpdate(oldService, service) { + if p.isInitialized() { p.runner.Run() } } @@ -632,12 +626,17 @@ func NewProxier( enableEndpointSlice := features.DefaultFeatureGate.Enabled(features.EndpointSlice) + ipFamily := corev1.IPv4Protocol + if isIPv6 { + ipFamily = corev1.IPv6Protocol + } + p := &proxier{ enableEndpointSlice: enableEndpointSlice, endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), endpointsChanges: newEndpointsChangesTracker(hostname, enableEndpointSlice, isIPv6), - serviceChanges: newServiceChangesTracker(recorder, isIPv6), + serviceChanges: newServiceChangesTracker(recorder, ipFamily), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, endpointsInstalledMap: types.EndpointsMap{}, @@ -659,7 +658,6 @@ func NewProxier( p.endpointsConfig = config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod) p.endpointsConfig.RegisterEventHandler(p) } - p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, 0, 30*time.Second, -1) return p } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 3a0fc040a38..03c8e1b5b1a 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -18,6 +18,7 @@ import ( "fmt" "net" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -87,9 +88,15 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { runtime.NewScheme(), corev1.EventSource{Component: componentName, Host: hostname}, ) + + ipFamily := corev1.IPv4Protocol + if isIPv6 { + ipFamily = corev1.IPv6Protocol + } + p := &proxier{ endpointsChanges: newEndpointsChangesTracker(hostname, false, isIPv6), - serviceChanges: newServiceChangesTracker(recorder, isIPv6), + serviceChanges: newServiceChangesTracker(recorder, ipFamily), serviceMap: k8sproxy.ServiceMap{}, serviceInstalledMap: k8sproxy.ServiceMap{}, endpointsInstalledMap: types.EndpointsMap{}, @@ -100,6 +107,7 @@ func NewFakeProxier(ofClient openflow.Client, isIPv6 bool) *proxier { serviceStringMap: map[string]k8sproxy.ServicePortName{}, isIPv6: isIPv6, } + p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) return p } @@ -216,6 +224,81 @@ func TestClusterIPv6(t *testing.T) { testClusterIP(t, net.ParseIP("10:20::41"), net.ParseIP("10:180::1"), true) } +func TestDualStackService(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient := ofmock.NewMockClient(ctrl) + fpv4 := NewFakeProxier(mockOFClient, false) + fpv6 := NewFakeProxier(mockOFClient, true) + metaProxier := k8sproxy.NewMetaProxier(fpv4, fpv6) + + svcPort := 80 + svcPortName := k8sproxy.ServicePortName{ + NamespacedName: makeNamespaceName("ns1", "svc1"), + Port: "80", + Protocol: corev1.ProtocolTCP, + } + svcIPv4 := net.ParseIP("10.20.30.41") + svcIPv6 := net.ParseIP("10:20::41") + + s := makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *corev1.Service) { + svc.Spec.ClusterIP = svcIPv4.String() + svc.Spec.ClusterIPs = []string{svcIPv4.String(), svcIPv6.String()} + svc.Spec.IPFamilies = []corev1.IPFamily{corev1.IPv4Protocol, corev1.IPv6Protocol} + svc.Spec.Ports = []corev1.ServicePort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }} + }) + + epv4 := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "10.180.30.41", + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + }) + + epv6 := makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *corev1.Endpoints) { + ept.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{ + IP: "10:180::1", + }}, + Ports: []corev1.EndpointPort{{ + Name: svcPortName.Port, + Port: int32(svcPort), + Protocol: corev1.ProtocolTCP, + }}, + }} + }) + + metaProxier.OnServiceUpdate(nil, s) + metaProxier.OnServiceSynced() + metaProxier.OnEndpointsUpdate(nil, epv4) + metaProxier.OnEndpointsUpdate(nil, epv6) + metaProxier.OnEndpointsSynced() + + groupIDv4, _ := fpv4.groupCounter.Get(svcPortName) + groupIDv6, _ := fpv6.groupCounter.Get(svcPortName) + + mockOFClient.EXPECT().InstallServiceGroup(groupIDv4, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDv4, svcIPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0)).Times(1) + + mockOFClient.EXPECT().InstallServiceGroup(groupIDv6, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDv6, svcIPv6, uint16(svcPort), binding.ProtocolTCPv6, uint16(0)).Times(1) + + fpv4.syncProxyRules() + fpv6.syncProxyRules() +} + func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/agent/proxy/service.go b/pkg/agent/proxy/service.go index 539bfe573a4..b6fb264e94f 100644 --- a/pkg/agent/proxy/service.go +++ b/pkg/agent/proxy/service.go @@ -31,8 +31,8 @@ type serviceChangesTracker struct { initialized bool } -func newServiceChangesTracker(recorder record.EventRecorder, isIPv6 bool) *serviceChangesTracker { - return &serviceChangesTracker{tracker: k8sproxy.NewServiceChangeTracker(types.NewServiceInfo, &isIPv6, recorder)} +func newServiceChangesTracker(recorder record.EventRecorder, ipFamily v1.IPFamily) *serviceChangesTracker { + return &serviceChangesTracker{tracker: k8sproxy.NewServiceChangeTracker(types.NewServiceInfo, ipFamily, recorder, nil)} } func (sh *serviceChangesTracker) OnServiceSynced() { @@ -53,5 +53,5 @@ func (sh *serviceChangesTracker) Synced() bool { } func (sh *serviceChangesTracker) Update(serviceMap k8sproxy.ServiceMap) k8sproxy.UpdateServiceMapResult { - return k8sproxy.UpdateServiceMap(serviceMap, sh.tracker) + return serviceMap.Update(sh.tracker) } diff --git a/pkg/agent/proxy/types/types.go b/pkg/agent/proxy/types/types.go index f3792c2d020..349d527fd35 100644 --- a/pkg/agent/proxy/types/types.go +++ b/pkg/agent/proxy/types/types.go @@ -32,7 +32,7 @@ type ServiceInfo struct { // NewServiceInfo returns a new k8sproxy.ServicePort which abstracts a serviceInfo. func NewServiceInfo(port *corev1.ServicePort, service *corev1.Service, baseInfo *k8sproxy.BaseServiceInfo) k8sproxy.ServicePort { info := &ServiceInfo{BaseServiceInfo: baseInfo} - if utilnet.IsIPv6String(service.Spec.ClusterIP) { + if utilnet.IsIPv6(baseInfo.ClusterIP()) { info.OFProtocol = openflow.ProtocolTCPv6 if port.Protocol == corev1.ProtocolUDP { info.OFProtocol = openflow.ProtocolUDPv6 diff --git a/third_party/proxy/doc.go b/third_party/proxy/doc.go index 4ca2c4fca85..bca537fdc7e 100644 --- a/third_party/proxy/doc.go +++ b/third_party/proxy/doc.go @@ -13,7 +13,7 @@ // limitations under the License. // Package proxy is copied from -// k8s.io/kubernetes@/v1.17.6(https://github.com/kubernetes/kubernetes/tree/v1.17.6) +// k8s.io/kubernetes@/v1.21.0(https://github.com/kubernetes/kubernetes/tree/v1.21.0) // to avoid importing the whole kubernetes repo. Some unneeded functions are removed. package proxy diff --git a/third_party/proxy/meta_proxier.go b/third_party/proxy/meta_proxier.go index 7fe0c0019a2..279c9ad3138 100644 --- a/third_party/proxy/meta_proxier.go +++ b/third_party/proxy/meta_proxier.go @@ -69,35 +69,21 @@ func (proxier *metaProxier) SyncLoop() { // OnServiceAdd is called whenever creation of new service object is observed. func (proxier *metaProxier) OnServiceAdd(service *v1.Service) { - ipfamily := service.Spec.IPFamilies[0] - if ipfamily == v1.IPv4Protocol { - proxier.ipv4Proxier.OnServiceAdd(service) - return - } + proxier.ipv4Proxier.OnServiceAdd(service) proxier.ipv6Proxier.OnServiceAdd(service) } // OnServiceUpdate is called whenever modification of an existing // service object is observed. func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) { - // IPFamily is immutable, hence we only need to check on the new service - ipfamily := service.Spec.IPFamilies[0] - if ipfamily == v1.IPv4Protocol { - proxier.ipv4Proxier.OnServiceUpdate(oldService, service) - return - } - + proxier.ipv4Proxier.OnServiceUpdate(oldService, service) proxier.ipv6Proxier.OnServiceUpdate(oldService, service) } // OnServiceDelete is called whenever deletion of an existing service // object is observed. func (proxier *metaProxier) OnServiceDelete(service *v1.Service) { - ipfamily := service.Spec.IPFamilies[0] - if ipfamily == v1.IPv4Protocol { - proxier.ipv4Proxier.OnServiceDelete(service) - return - } + proxier.ipv4Proxier.OnServiceDelete(service) proxier.ipv6Proxier.OnServiceDelete(service) } diff --git a/third_party/proxy/service.go b/third_party/proxy/service.go index a9d0e77c0bd..c61b13f198d 100644 --- a/third_party/proxy/service.go +++ b/third_party/proxy/service.go @@ -31,6 +31,7 @@ limitations under the License. Modifies: - Replace import from "k8s.io/kubernetes/pkg/api/v1/service" to "antrea.io/antrea/pkg/agent/proxy/upstream/util" - Remove import "k8s.io/kubernetes/pkg/proxy/metrics" and related invokes +- Remove check for feature gate ServiceInternalTrafficPolicy */ package proxy @@ -47,7 +48,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - utilnet "k8s.io/utils/net" utilproxy "antrea.io/antrea/third_party/proxy/util" ) @@ -67,8 +67,11 @@ type BaseServiceInfo struct { externalIPs []string loadBalancerSourceRanges []string healthCheckNodePort int - onlyNodeLocalEndpoints bool + nodeLocalExternal bool + nodeLocalInternal bool + internalTrafficPolicy *v1.ServiceInternalTrafficPolicyType topologyKeys []string + hintsAnnotation string } var _ ServicePort = &BaseServiceInfo{} @@ -132,9 +135,19 @@ func (info *BaseServiceInfo) LoadBalancerIPStrings() []string { return ips } -// OnlyNodeLocalEndpoints is part of ServicePort interface. -func (info *BaseServiceInfo) OnlyNodeLocalEndpoints() bool { - return info.onlyNodeLocalEndpoints +// NodeLocalExternal is part of ServicePort interface. +func (info *BaseServiceInfo) NodeLocalExternal() bool { + return info.nodeLocalExternal +} + +// NodeLocalInternal is part of ServicePort interface +func (info *BaseServiceInfo) NodeLocalInternal() bool { + return info.nodeLocalInternal +} + +// InternalTrafficPolicy is part of ServicePort interface +func (info *BaseServiceInfo) InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType { + return info.internalTrafficPolicy } // TopologyKeys is part of ServicePort interface. @@ -142,62 +155,78 @@ func (info *BaseServiceInfo) TopologyKeys() []string { return info.topologyKeys } +// HintsAnnotation is part of ServicePort interface. +func (info *BaseServiceInfo) HintsAnnotation() string { + return info.hintsAnnotation +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { - onlyNodeLocalEndpoints := false + nodeLocalExternal := false if utilproxy.RequestsOnlyLocalTraffic(service) { - onlyNodeLocalEndpoints = true + nodeLocalExternal = true } + nodeLocalInternal := utilproxy.RequestsOnlyLocalTrafficForInternal(service) var stickyMaxAgeSeconds int if service.Spec.SessionAffinity == v1.ServiceAffinityClientIP { // Kube-apiserver side guarantees SessionAffinityConfig won't be nil when session affinity type is ClientIP stickyMaxAgeSeconds = int(*service.Spec.SessionAffinityConfig.ClientIP.TimeoutSeconds) } + + clusterIP := utilproxy.GetClusterIPByFamily(sct.ipFamily, service) info := &BaseServiceInfo{ - clusterIP: net.ParseIP(service.Spec.ClusterIP), - port: int(port.Port), - protocol: port.Protocol, - nodePort: int(port.NodePort), - sessionAffinityType: service.Spec.SessionAffinity, - stickyMaxAgeSeconds: stickyMaxAgeSeconds, - onlyNodeLocalEndpoints: onlyNodeLocalEndpoints, - topologyKeys: service.Spec.TopologyKeys, + clusterIP: net.ParseIP(clusterIP), + port: int(port.Port), + protocol: port.Protocol, + nodePort: int(port.NodePort), + sessionAffinityType: service.Spec.SessionAffinity, + stickyMaxAgeSeconds: stickyMaxAgeSeconds, + nodeLocalExternal: nodeLocalExternal, + nodeLocalInternal: nodeLocalInternal, + internalTrafficPolicy: service.Spec.InternalTrafficPolicy, + topologyKeys: service.Spec.TopologyKeys, + hintsAnnotation: service.Annotations[v1.AnnotationTopologyAwareHints], } - if sct.isIPv6Mode == nil { - info.externalIPs = make([]string, len(service.Spec.ExternalIPs)) - info.loadBalancerSourceRanges = make([]string, len(service.Spec.LoadBalancerSourceRanges)) - copy(info.loadBalancerSourceRanges, service.Spec.LoadBalancerSourceRanges) - copy(info.externalIPs, service.Spec.ExternalIPs) - // Deep-copy in case the service instance changes - info.loadBalancerStatus = *service.Status.LoadBalancer.DeepCopy() - } else { - // Filter out the incorrect IP version case. - // If ExternalIPs, LoadBalancerSourceRanges and LoadBalancerStatus Ingress on service contains incorrect IP versions, - // only filter out the incorrect ones. - var incorrectIPs []string - info.externalIPs, incorrectIPs = utilproxy.FilterIncorrectIPVersion(service.Spec.ExternalIPs, *sct.isIPv6Mode) - if len(incorrectIPs) > 0 { - utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "externalIPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) - } - info.loadBalancerSourceRanges, incorrectIPs = utilproxy.FilterIncorrectCIDRVersion(service.Spec.LoadBalancerSourceRanges, *sct.isIPv6Mode) - if len(incorrectIPs) > 0 { - utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "loadBalancerSourceRanges", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) - } - // Obtain Load Balancer Ingress IPs - var ips []string - for _, ing := range service.Status.LoadBalancer.Ingress { - ips = append(ips, ing.IP) - } - if len(ips) > 0 { - correctIPs, incorrectIPs := utilproxy.FilterIncorrectIPVersion(ips, *sct.isIPv6Mode) - if len(incorrectIPs) > 0 { - utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "Load Balancer ingress IPs", strings.Join(incorrectIPs, ","), service.Namespace, service.Name, service.UID) - } - // Create the LoadBalancerStatus with the filtererd IPs - for _, ip := range correctIPs { - info.loadBalancerStatus.Ingress = append(info.loadBalancerStatus.Ingress, v1.LoadBalancerIngress{IP: ip}) + loadBalancerSourceRanges := make([]string, len(service.Spec.LoadBalancerSourceRanges)) + for i, sourceRange := range service.Spec.LoadBalancerSourceRanges { + loadBalancerSourceRanges[i] = strings.TrimSpace(sourceRange) + } + // filter external ips, source ranges and ingress ips + // prior to dual stack services, this was considered an error, but with dual stack + // services, this is actually expected. Hence we downgraded from reporting by events + // to just log lines with high verbosity + + ipFamilyMap := utilproxy.MapIPsByIPFamily(service.Spec.ExternalIPs) + info.externalIPs = ipFamilyMap[sct.ipFamily] + + // Log the IPs not matching the ipFamily + if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(ips) > 0 { + klog.V(4).Infof("service change tracker(%v) ignored the following external IPs(%s) for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(ips, ","), service.Namespace, service.Name) + } + + ipFamilyMap = utilproxy.MapCIDRsByIPFamily(loadBalancerSourceRanges) + info.loadBalancerSourceRanges = ipFamilyMap[sct.ipFamily] + // Log the CIDRs not matching the ipFamily + if cidrs, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(cidrs) > 0 { + klog.V(4).Infof("service change tracker(%v) ignored the following load balancer source ranges(%s) for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(cidrs, ","), service.Namespace, service.Name) + } + + // Obtain Load Balancer Ingress IPs + var ips []string + for _, ing := range service.Status.LoadBalancer.Ingress { + ips = append(ips, ing.IP) + } + + if len(ips) > 0 { + ipFamilyMap = utilproxy.MapIPsByIPFamily(ips) + + if ipList, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(ipList) > 0 { + klog.V(4).Infof("service change tracker(%v) ignored the following load balancer(%s) ingress ips for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(ipList, ","), service.Namespace, service.Name) - } + } + // Create the LoadBalancerStatus with the filtered IPs + for _, ip := range ipFamilyMap[sct.ipFamily] { + info.loadBalancerStatus.Ingress = append(info.loadBalancerStatus.Ingress, v1.LoadBalancerIngress{IP: ip}) } } @@ -215,6 +244,10 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort +// This handler is invoked by the apply function on every change. This function should not modify the +// ServiceMap's but just use the changes for any Proxier specific cleanup. +type processServiceMapChangeFunc func(previous, current ServiceMap) + // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // changes are accumulated, i.e. previous is state from before applying the changes, // current is state after applying all of the changes. @@ -231,19 +264,21 @@ type ServiceChangeTracker struct { // items maps a service to its serviceChange. items map[types.NamespacedName]*serviceChange // makeServiceInfo allows proxier to inject customized information when processing service. - makeServiceInfo makeServicePortFunc - // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. - isIPv6Mode *bool - recorder record.EventRecorder + makeServiceInfo makeServicePortFunc + processServiceMapChange processServiceMapChangeFunc + ipFamily v1.IPFamily + + recorder record.EventRecorder } // NewServiceChangeTracker initializes a ServiceChangeTracker -func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker { +func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, ipFamily v1.IPFamily, recorder record.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker { return &ServiceChangeTracker{ - items: make(map[types.NamespacedName]*serviceChange), - makeServiceInfo: makeServiceInfo, - isIPv6Mode: isIPv6Mode, - recorder: recorder, + items: make(map[types.NamespacedName]*serviceChange), + makeServiceInfo: makeServiceInfo, + recorder: recorder, + ipFamily: ipFamily, + processServiceMapChange: processServiceMapChange, } } @@ -279,6 +314,8 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { // if change.previous equal to change.current, it means no change if reflect.DeepEqual(change.previous, change.current) { delete(sct.items, namespacedName) + } else { + klog.V(2).Infof("Service %s updated: %d ports", namespacedName, len(change.current)) } return len(sct.items) > 0 } @@ -289,19 +326,19 @@ type UpdateServiceMapResult struct { // The value(uint16) of HCServices map is the service health check node port. HCServiceNodePorts map[types.NamespacedName]uint16 // UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports. - // Callers can use this to stop timeout-waits or clear connection-tracking information. + // Callers can use this to abort timeout-waits or clear connection-tracking information. UDPStaleClusterIP sets.String } -// UpdateServiceMap updates ServiceMap based on the given changes. -func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) { +// Update updates ServiceMap base on the given changes. +func (sm ServiceMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { result.UDPStaleClusterIP = sets.NewString() - serviceMap.apply(changes, result.UDPStaleClusterIP) + sm.apply(changes, result.UDPStaleClusterIP) // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. result.HCServiceNodePorts = make(map[types.NamespacedName]uint16) - for svcPortName, info := range serviceMap { + for svcPortName, info := range sm { if info.HealthCheckNodePort() != 0 { result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort()) } @@ -320,21 +357,18 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic if service == nil { return nil } - svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} - if utilproxy.ShouldSkipService(svcName, service) { + + if utilproxy.ShouldSkipService(service) { return nil } - if len(service.Spec.ClusterIP) != 0 { - // Filter out the incorrect IP version case. - // If ClusterIP on service has incorrect IP version, service itself will be ignored. - if sct.isIPv6Mode != nil && utilnet.IsIPv6String(service.Spec.ClusterIP) != *sct.isIPv6Mode { - utilproxy.LogAndEmitIncorrectIPVersionEvent(sct.recorder, "clusterIP", service.Spec.ClusterIP, service.Namespace, service.Name, service.UID) - return nil - } + clusterIP := utilproxy.GetClusterIPByFamily(sct.ipFamily, service) + if clusterIP == "" { + return nil } serviceMap := make(ServiceMap) + svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] svcPortName := ServicePortName{NamespacedName: svcName, Port: servicePort.Name, Protocol: servicePort.Protocol} @@ -350,10 +384,14 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic // apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the // udp protocol service cluster ip when service is deleted from the ServiceMap. +// apply triggers processServiceMapChange on every change. func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { + if changes.processServiceMapChange != nil { + changes.processServiceMapChange(change.previous, change.current) + } sm.merge(change.current) // filter out the Update event of current changes from previous changes before calling unmerge() so that can // skip deleting the Update events. diff --git a/third_party/proxy/types.go b/third_party/proxy/types.go index b3583028ce8..c928505b6ff 100644 --- a/third_party/proxy/types.go +++ b/third_party/proxy/types.go @@ -95,10 +95,16 @@ type ServicePort interface { HealthCheckNodePort() int // GetNodePort returns a service Node port if present. If return 0, it means not present. NodePort() int - // GetOnlyNodeLocalEndpoints returns if a service has only node local endpoints - OnlyNodeLocalEndpoints() bool + // NodeLocalExternal returns if a service has only node local endpoints for external traffic. + NodeLocalExternal() bool + // NodeLocalInternal returns if a service has only node local endpoints for internal traffic. + NodeLocalInternal() bool + // InternalTrafficPolicy returns service InternalTrafficPolicy + InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType // TopologyKeys returns service TopologyKeys as a string array. TopologyKeys() []string + // HintsAnnotation returns the value of the v1.AnnotationTopologyAwareHints annotation. + HintsAnnotation() string } // Endpoint in an interface which abstracts information about an endpoint. diff --git a/third_party/proxy/util/service.go b/third_party/proxy/util/service.go index a51b966db45..3c3f565574d 100644 --- a/third_party/proxy/util/service.go +++ b/third_party/proxy/util/service.go @@ -35,7 +35,7 @@ Modifies: package util -import "k8s.io/api/core/v1" +import v1 "k8s.io/api/core/v1" // RequestsOnlyLocalTraffic checks if service requests OnlyLocal traffic. func RequestsOnlyLocalTraffic(service *v1.Service) bool { @@ -46,6 +46,15 @@ func RequestsOnlyLocalTraffic(service *v1.Service) bool { return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal } +// RequestsOnlyLocalTrafficForInternal checks if service prefers Node Local +// endpoints for internal traffic +func RequestsOnlyLocalTrafficForInternal(service *v1.Service) bool { + if service.Spec.InternalTrafficPolicy == nil { + return false + } + return *service.Spec.InternalTrafficPolicy == v1.ServiceInternalTrafficPolicyLocal +} + // NeedsHealthCheck checks if service needs health check. func NeedsHealthCheck(service *v1.Service) bool { if service.Spec.Type != v1.ServiceTypeLoadBalancer { diff --git a/third_party/proxy/util/utils.go b/third_party/proxy/util/utils.go index 342732ee061..a7de3754a5f 100644 --- a/third_party/proxy/util/utils.go +++ b/third_party/proxy/util/utils.go @@ -40,6 +40,7 @@ package util import ( "context" + "errors" "fmt" "net" @@ -51,21 +52,29 @@ import ( "k8s.io/klog/v2" ) +var ( + // ErrAddressNotAllowed indicates the address is not allowed + ErrAddressNotAllowed = errors.New("address not allowed") + + // ErrNoAddresses indicates there are no addresses for the hostname + ErrNoAddresses = errors.New("No addresses for hostname") +) + // Resolver is an interface for net.Resolver type Resolver interface { LookupIPAddr(ctx context.Context, host string) ([]net.IPAddr, error) } // ShouldSkipService checks if a given service should skip proxying -func ShouldSkipService(svcName types.NamespacedName, service *v1.Service) bool { +func ShouldSkipService(service *v1.Service) bool { // if ClusterIP is "None" or empty, skip proxying if service.Spec.ClusterIP == v1.ClusterIPNone || service.Spec.ClusterIP == "" { - klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP) + klog.V(3).Infof("Skipping service %s in namespace %s due to clusterIP = %q", service.Name, service.Namespace, service.Spec.ClusterIP) return true } // Even if ClusterIP is set, ServiceTypeExternalName services don't get proxied if service.Spec.Type == v1.ServiceTypeExternalName { - klog.V(3).Infof("Skipping service %s due to Type=ExternalName", svcName) + klog.V(3).Infof("Skipping service %s in namespace %s due to Type=ExternalName", service.Name, service.Namespace) return true } return false @@ -107,3 +116,90 @@ func filterWithCondition(strs []string, expectedCondition bool, conditionFunc fu } return corrects, incorrects } + +// GetClusterIPByFamily returns a service clusterip by family +func GetClusterIPByFamily(ipFamily v1.IPFamily, service *v1.Service) string { + // allowing skew + if len(service.Spec.IPFamilies) == 0 { + if len(service.Spec.ClusterIP) == 0 || service.Spec.ClusterIP == v1.ClusterIPNone { + return "" + } + + IsIPv6Family := (ipFamily == v1.IPv6Protocol) + if IsIPv6Family == utilnet.IsIPv6String(service.Spec.ClusterIP) { + return service.Spec.ClusterIP + } + + return "" + } + + for idx, family := range service.Spec.IPFamilies { + if family == ipFamily { + if idx < len(service.Spec.ClusterIPs) { + return service.Spec.ClusterIPs[idx] + } + } + } + + return "" +} + +// MapIPsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6) +func MapIPsByIPFamily(ipStrings []string) map[v1.IPFamily][]string { + ipFamilyMap := map[v1.IPFamily][]string{} + for _, ip := range ipStrings { + // Handle only the valid IPs + if ipFamily, err := getIPFamilyFromIP(ip); err == nil { + ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], ip) + } else { + klog.Errorf("Skipping invalid IP: %s", ip) + } + } + return ipFamilyMap +} + +func getIPFamilyFromIP(ipStr string) (v1.IPFamily, error) { + netIP := net.ParseIP(ipStr) + if netIP == nil { + return "", ErrAddressNotAllowed + } + + if utilnet.IsIPv6(netIP) { + return v1.IPv6Protocol, nil + } + return v1.IPv4Protocol, nil +} + +// OtherIPFamily returns the other ip family +func OtherIPFamily(ipFamily v1.IPFamily) v1.IPFamily { + if ipFamily == v1.IPv6Protocol { + return v1.IPv4Protocol + } + + return v1.IPv6Protocol +} + +// MapCIDRsByIPFamily maps a slice of IPs to their respective IP families (v4 or v6) +func MapCIDRsByIPFamily(cidrStrings []string) map[v1.IPFamily][]string { + ipFamilyMap := map[v1.IPFamily][]string{} + for _, cidr := range cidrStrings { + // Handle only the valid CIDRs + if ipFamily, err := getIPFamilyFromCIDR(cidr); err == nil { + ipFamilyMap[ipFamily] = append(ipFamilyMap[ipFamily], cidr) + } else { + klog.Errorf("Skipping invalid cidr: %s", cidr) + } + } + return ipFamilyMap +} + +func getIPFamilyFromCIDR(cidrStr string) (v1.IPFamily, error) { + _, netCIDR, err := net.ParseCIDR(cidrStr) + if err != nil { + return "", ErrAddressNotAllowed + } + if utilnet.IsIPv6CIDR(netCIDR) { + return v1.IPv6Protocol, nil + } + return v1.IPv4Protocol, nil +}