diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 9173c8da5c1..e9964be7390 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -26,7 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" - k8sapitypes "k8s.io/apimachinery/pkg/types" + apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" @@ -105,6 +105,13 @@ type proxier struct { // oversizeServiceSet records the Services that have more than 800 Endpoints. oversizeServiceSet sets.String + // serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is + // the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a + // ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts. + // With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it + // exactly once when it's no longer used by any ServicePorts. + // It applies to ClusterIP and LoadBalancerIP. + serviceIPRouteReferences map[string]sets.String // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool syncedOnceMutex sync.RWMutex @@ -114,7 +121,6 @@ type proxier struct { ofClient openflow.Client routeClient route.Interface nodePortAddresses []net.IP - hostGateWay string isIPv6 bool proxyAll bool endpointSliceEnabled bool @@ -145,28 +151,35 @@ func (p *proxier) removeStaleServices() { continue } svcInfo := svcPort.(*types.ServiceInfo) - klog.V(2).Infof("Removing stale Service: %s %s", svcPortName.Name, svcInfo.String()) + svcInfoStr := svcInfo.String() + klog.V(2).InfoS("Removing stale Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) if p.oversizeServiceSet.Has(svcPortName.String()) { p.oversizeServiceSet.Delete(svcPortName.String()) } if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } + // Remove NodePort and ClusterIP flows and configurations. if p.proxyAll { - // Remove NodePort flows and configurations. if svcInfo.NodePort() > 0 { if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) + continue + } + } + if svcInfo.ClusterIP() != nil { + if err := p.deleteRouteForServiceIP(svcInfoStr, svcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { + klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) continue } } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs && len(svcInfo.LoadBalancerIPStrings()) > 0 { - if err := p.uninstallLoadBalancerService(svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { + klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -314,43 +327,80 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot return nil } -func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { +func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer load balancing flows: %w", err) + ip := net.ParseIP(ingress) + if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer); err != nil { + return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) + } + if p.proxyAll { + if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddLoadBalancer); err != nil { + return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err) + } } } } - if p.proxyAll { - if err := p.routeClient.AddLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to install Service LoadBalancer traffic redirecting flows: %w", err) + return nil +} + +func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error { + ipStr := ip.String() + references, exists := p.serviceIPRouteReferences[ipStr] + // If the IP was not referenced by any Service port, install a route for it. + // Otherwise, just reference it. + if !exists { + if err := addRouteFn(ip); err != nil { + return err } + references = sets.NewString(svcInfoStr) + p.serviceIPRouteReferences[ipStr] = references + } else { + references.Insert(svcInfoStr) } - return nil } -func (p *proxier) uninstallLoadBalancerService(loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { +func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { - if err := p.ofClient.UninstallServiceFlows(net.ParseIP(ingress), svcPort, protocol); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer load balancing flows: %w", err) + ip := net.ParseIP(ingress) + if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { + return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err) + } + if p.proxyAll { + if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteLoadBalancer); err != nil { + return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) + } } } } - if p.proxyAll { - if err := p.routeClient.DeleteLoadBalancer(loadBalancerIPStrings); err != nil { - return fmt.Errorf("failed to remove Service LoadBalancer traffic redirecting flows: %w", err) + return nil +} + +func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error { + ipStr := ip.String() + references, exists := p.serviceIPRouteReferences[ipStr] + // If the IP was not referenced by this Service port, skip it. + if exists && references.Has(svcInfoStr) { + // Delete the IP only if this Service port is the last one referencing it. + // Otherwise, just dereference it. + if references.Len() == 1 { + if err := deleteRouteFn(ip); err != nil { + return err + } + delete(p.serviceIPRouteReferences, ipStr) + } else { + references.Delete(svcInfoStr) } } - return nil } func (p *proxier) installServices() { for svcPortName, svcPort := range p.serviceMap { svcInfo := svcPort.(*types.ServiceInfo) + svcInfoStr := svcInfo.String() endpointsInstalled, ok := p.endpointsInstalledMap[svcPortName] if !ok { endpointsInstalled = map[string]k8sproxy.Endpoint{} @@ -591,9 +641,9 @@ func (p *proxier) installServices() { } } // If previous Service which has ClusterIP should be removed, remove ClusterIP routes. - if svcInfo.ClusterIP() != nil { - if err := p.routeClient.DeleteClusterIPRoute(pSvcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to remove ClusterIP Service routes", "Service", svcPortName) + if pSvcInfo.ClusterIP() != nil { + if err := p.deleteRouteForServiceIP(pSvcInfo.String(), pSvcInfo.ClusterIP(), p.routeClient.DeleteClusterIPRoute); err != nil { + klog.ErrorS(err, "Error when uninstalling ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } @@ -613,8 +663,8 @@ func (p *proxier) installServices() { // is created, the routing target IP block will be recalculated for expansion to be able to route the new // created ClusterIP. Deleting a ClusterIP will not shrink the target routing IP block. The Service CIDR // can be finally calculated after creating enough ClusterIPs. - if err := p.routeClient.AddClusterIPRoute(svcInfo.ClusterIP()); err != nil { - klog.ErrorS(err, "Failed to install ClusterIP route of Service", "Service", svcPortName) + if err := p.addRouteForServiceIP(svcInfo.String(), svcInfo.ClusterIP(), p.routeClient.AddClusterIPRoute); err != nil { + klog.ErrorS(err, "Error when installing ClusterIP route for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } @@ -641,14 +691,14 @@ func (p *proxier) installServices() { } // Remove LoadBalancer flows and configurations. if len(toDelete) > 0 { - if err := p.uninstallLoadBalancerService(toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { - klog.ErrorS(err, "Failed to remove flows and configurations of Service", "Service", svcPortName) + if err := p.uninstallLoadBalancerService(pSvcInfo.String(), toDelete, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { + klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServicePortName", svcPortName, "ServiceInfo", svcInfoStr) continue } } // Install LoadBalancer flows and configurations. if len(toAdd) > 0 { - if err := p.installLoadBalancerService(nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.NodeLocalExternal()); err != nil { + if err := p.installLoadBalancerService(svcInfo.String(), nGroupID, toAdd, uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), svcInfo.NodeLocalExternal()); err != nil { klog.ErrorS(err, "Failed to install LoadBalancer flows and configurations of Service", "Service", svcPortName) continue } @@ -838,7 +888,7 @@ func (p *proxier) GetProxyProvider() k8sproxy.Provider { } func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, []binding.GroupIDType, bool) { - namespacedName := k8sapitypes.NamespacedName{Namespace: namespace, Name: serviceName} + namespacedName := apimachinerytypes.NamespacedName{Namespace: namespace, Name: serviceName} p.serviceEndpointsMapsMutex.Lock() defer p.serviceEndpointsMapsMutex.Unlock() @@ -914,6 +964,7 @@ func NewProxier( endpointsInstalledMap: types.EndpointsMap{}, endpointsMap: types.EndpointsMap{}, endpointReferenceCounter: map[string]int{}, + serviceIPRouteReferences: map[string]sets.String{}, serviceStringMap: map[string]k8sproxy.ServicePortName{}, oversizeServiceSet: sets.NewString(), groupCounter: groupCounter, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index fc809925fad..c591da211a8 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -24,13 +24,17 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" apimachinerytypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/component-base/metrics/testutil" + "k8s.io/utils/pointer" + agentconfig "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow" ofmock "antrea.io/antrea/pkg/agent/openflow/testing" "antrea.io/antrea/pkg/agent/proxy/metrics" @@ -104,6 +108,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*corev1.Endpoints)) type proxyOptions struct { proxyAllEnabled bool proxyLoadBalancerIPs bool + endpointSliceEnabled bool } type proxyOptionsFn func(*proxyOptions) @@ -112,6 +117,23 @@ func withProxyAll(o *proxyOptions) { o.proxyAllEnabled = true } +func withEndpointSlice(o *proxyOptions) { + o.endpointSliceEnabled = true +} + +func makeEndpointSliceMap(proxier *proxier, allEndpoints ...*discovery.EndpointSlice) { + for i := range allEndpoints { + proxier.endpointsChanges.OnEndpointSliceUpdate(allEndpoints[i], false) + } + proxier.endpointsChanges.OnEndpointsSynced() +} + +func getMockClients(ctrl *gomock.Controller) (*ofmock.MockClient, *routemock.MockInterface) { + mockOFClient := ofmock.NewMockClient(ctrl) + mockRouteClient := routemock.NewMockInterface(ctrl) + return mockOFClient, mockRouteClient +} + func withoutProxyLoadBalancerIPs(o *proxyOptions) { o.proxyLoadBalancerIPs = false } @@ -132,6 +154,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP o := &proxyOptions{ proxyAllEnabled: false, proxyLoadBalancerIPs: true, + endpointSliceEnabled: false, } for _, fn := range options { @@ -145,6 +168,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP serviceInstalledMap: k8sproxy.ServiceMap{}, endpointsInstalledMap: types.EndpointsMap{}, endpointReferenceCounter: map[string]int{}, + serviceIPRouteReferences: map[string]sets.String{}, endpointsMap: types.EndpointsMap{}, groupCounter: types.NewGroupCounter(groupIDAllocator, make(chan string, 100)), ofClient: ofClient, @@ -154,8 +178,12 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP nodePortAddresses: nodePortAddresses, proxyAll: o.proxyAllEnabled, proxyLoadBalancerIPs: o.proxyLoadBalancerIPs, + endpointSliceEnabled: o.endpointSliceEnabled, } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) + if o.endpointSliceEnabled { + p.endpointsChanges = newEndpointsChangesTracker(hostname, o.endpointSliceEnabled, isIPv6) + } return p } @@ -351,7 +379,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep } mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) if proxyLoadBalancerIPs { - mockRouteClient.EXPECT().AddLoadBalancer([]string{loadBalancerIP.String()}).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) } mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) @@ -749,6 +777,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) fp.syncProxyRules() fp.serviceChanges.OnServiceUpdate(service, nil) @@ -1300,3 +1329,165 @@ func TestMetrics(t *testing.T) { }) } } + +func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(false) + nodePortAddresses := []net.IP{net.ParseIP("0.0.0.0")} + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, false, withProxyAll, withEndpointSlice) + + port80Str := "port80" + port80Int32 := int32(80) + port443Str := "port443" + port443Int32 := int32(443) + port30001Int32 := int32(30001) + port30002Int32 := int32(30002) + protocolTCP := corev1.ProtocolTCP + endpoint1Address := "192.168.0.11" + endpoint2Address := "192.168.1.11" + endpoint1NodeName := "localhost" + endpoint2NodeName := "node2" + svc1IPv4 := net.ParseIP("10.20.30.41") + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: port80Str, + Protocol: protocolTCP, + Port: port80Int32, + TargetPort: intstr.FromInt(int(port80Int32)), + NodePort: port30001Int32, + }, + { + Name: port443Str, + Protocol: protocolTCP, + Port: port443Int32, + TargetPort: intstr.FromInt(int(port443Int32)), + NodePort: port30002Int32, + }, + }, + ClusterIP: svc1IPv4.String(), + ClusterIPs: []string{svc1IPv4.String()}, + Type: corev1.ServiceTypeLoadBalancer, + ExternalTrafficPolicy: corev1.ServiceExternalTrafficPolicyTypeLocal, + HealthCheckNodePort: 40000, + IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol}, + }, + Status: corev1.ServiceStatus{ + LoadBalancer: corev1.LoadBalancerStatus{Ingress: []corev1.LoadBalancerIngress{ + {IP: loadBalancerIPv4.String()}, + }}, + }, + } + makeServiceMap(fp, svc) + + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc-x5ks2", + Namespace: svc.Namespace, + Labels: map[string]string{ + discovery.LabelServiceName: svc.Name, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{ + endpoint1Address, + }, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(false), + Terminating: pointer.Bool(false), + }, + NodeName: &endpoint1NodeName, + Topology: map[string]string{"kubernetes.io/hostname": endpoint1NodeName}, + }, + { + Addresses: []string{ + endpoint2Address, + }, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(false), + Terminating: pointer.Bool(false), + }, + NodeName: &endpoint2NodeName, + Topology: map[string]string{"kubernetes.io/hostname": endpoint2NodeName}, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &port80Str, + Port: &port80Int32, + Protocol: &protocolTCP, + }, + { + Name: &port443Str, + Port: &port443Int32, + Protocol: &protocolTCP, + }, + }, + } + makeEndpointSliceMap(fp, endpointSlice) + + localEndpointForPort80 := k8sproxy.NewBaseEndpointInfo(endpoint1Address, int(port80Int32), true, map[string]string{"kubernetes.io/hostname": endpoint1NodeName}) + localEndpointForPort443 := k8sproxy.NewBaseEndpointInfo(endpoint1Address, int(port443Int32), true, map[string]string{"kubernetes.io/hostname": endpoint1NodeName}) + remoteEndpointForPort80 := k8sproxy.NewBaseEndpointInfo(endpoint2Address, int(port80Int32), false, map[string]string{"kubernetes.io/hostname": endpoint2NodeName}) + remoteEndpointForPort443 := k8sproxy.NewBaseEndpointInfo(endpoint2Address, int(port443Int32), false, map[string]string{"kubernetes.io/hostname": endpoint2NodeName}) + + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort80}).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) + // The route for the ClusterIP and the LoadBalancer IP should only be installed once. + mockRouteClient.EXPECT().AddClusterIPRoute(svc1IPv4).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIPv4).Times(1) + + mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort443}).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeNodePort).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, corev1.ServiceTypeLoadBalancer).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) + + fp.syncProxyRules() + + // Remove the service. + fp.serviceChanges.OnServiceUpdate(svc, nil) + fp.endpointsChanges.OnEndpointSliceUpdate(endpointSlice, true) + + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, localEndpointForPort80) + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, remoteEndpointForPort80) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port80Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP) + + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, localEndpointForPort443) + mockOFClient.EXPECT().UninstallEndpointFlows(binding.ProtocolTCP, remoteEndpointForPort443) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) + mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(port443Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP) + // The route for the ClusterIP and the LoadBalancer IP should only be uninstalled once. + mockRouteClient.EXPECT().DeleteClusterIPRoute(svc1IPv4) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIPv4) + + fp.syncProxyRules() + + assert.Emptyf(t, fp.serviceIPRouteReferences, "serviceIPRouteReferences was not cleaned up after Service was removed") +} diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 630c0dc57c8..c55e9f7ae5a 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -65,11 +65,11 @@ type Interface interface { // ClusterIP Service traffic from host network. DeleteClusterIPRoute(svcIP net.IP) error - // AddLoadBalancer adds configurations when a LoadBalancer Service is created. - AddLoadBalancer(externalIPs []string) error + // AddLoadBalancer adds configurations when a LoadBalancer IP is added. + AddLoadBalancer(externalIP net.IP) error - // DeleteLoadBalancer deletes related configurations when a LoadBalancer Service is deleted. - DeleteLoadBalancer(externalIPs []string) error + // DeleteLoadBalancer deletes related configurations when a LoadBalancer IP is deleted. + DeleteLoadBalancer(externalIP net.IP) error // Run starts the sync loop. Run(stopCh <-chan struct{}) diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index b0f7a005eae..91694122bd0 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -1371,11 +1371,10 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error { return nil } -// addLoadBalancerIngressIPRoute is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea +// AddLoadBalancer is used to add routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. -func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { +func (c *Client) AddLoadBalancer(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - svcIP := net.ParseIP(svcIPStr) isIPv6 := utilnet.IsIPv6(svcIP) var gw net.IP var mask int @@ -1397,11 +1396,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error { return nil } -// deleteLoadBalancerIngressIPRoute is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea +// DeleteLoadBalancer is used to delete routing entry which is used to route LoadBalancer ingress IP to Antrea // gateway on host. -func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { +func (c *Client) DeleteLoadBalancer(svcIP net.IP) error { linkIndex := c.nodeConfig.GatewayConfig.LinkIndex - svcIP := net.ParseIP(svcIPStr) isIPv6 := utilnet.IsIPv6(svcIP) var gw net.IP var mask int @@ -1427,28 +1425,6 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error { return nil } -// AddLoadBalancer is used to add routing entries when a LoadBalancer Service is added. -func (c *Client) AddLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.addLoadBalancerIngressIPRoute(svcIPStr); err != nil { - return err - } - } - - return nil -} - -// DeleteLoadBalancer is used to delete routing entries when a LoadBalancer Service is deleted. -func (c *Client) DeleteLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.deleteLoadBalancerIngressIPRoute(svcIPStr); err != nil { - return err - } - } - - return nil -} - // AddLocalAntreaFlexibleIPAMPodRule is used to add IP to target ip set when an AntreaFlexibleIPAM Pod is added. An entry is added // for every Pod IP. func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { diff --git a/pkg/agent/route/route_linux_test.go b/pkg/agent/route/route_linux_test.go index ebbb8c4289a..eb269b31ef6 100644 --- a/pkg/agent/route/route_linux_test.go +++ b/pkg/agent/route/route_linux_test.go @@ -1038,12 +1038,12 @@ func TestAddLoadBalancer(t *testing.T) { nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIPs []string + externalIP string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIPs: []string{"1.1.1.1", "1.1.1.2"}, + name: "IPv4", + externalIP: "1.1.1.1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(&netlink.Route{ Dst: &net.IPNet{ @@ -1054,20 +1054,11 @@ func TestAddLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.2"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, { - name: "IPv6", - externalIPs: []string{"fd00:1234:5678:dead:beaf::1", "fd00:1234:5678:dead:beaf::a"}, + name: "IPv6", + externalIP: "fd00:1234:5678:dead:beaf::1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteReplace(&netlink.Route{ Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, @@ -1075,12 +1066,6 @@ func TestAddLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteReplace(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::a"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, } @@ -1095,7 +1080,7 @@ func TestAddLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.AddLoadBalancer(tt.externalIPs)) + assert.NoError(t, c.AddLoadBalancer(net.ParseIP(tt.externalIP))) }) } } @@ -1104,12 +1089,12 @@ func TestDeleteLoadBalancer(t *testing.T) { nodeConfig := &config.NodeConfig{GatewayConfig: &config.GatewayConfig{LinkIndex: 10}} tests := []struct { name string - externalIPs []string + externalIP string expectedCalls func(mockNetlink *netlinktest.MockInterfaceMockRecorder) }{ { - name: "IPv4", - externalIPs: []string{"1.1.1.1", "1.1.1.2"}, + name: "IPv4", + externalIP: "1.1.1.1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(&netlink.Route{ Dst: &net.IPNet{ @@ -1120,20 +1105,11 @@ func TestDeleteLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{ - IP: net.ParseIP("1.1.1.2"), - Mask: net.CIDRMask(32, 32), - }, - Gw: config.VirtualServiceIPv4, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, { - name: "IPv6", - externalIPs: []string{"fd00:1234:5678:dead:beaf::1", "fd00:1234:5678:dead:beaf::a"}, + name: "IPv6", + externalIP: "fd00:1234:5678:dead:beaf::1", expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) { mockNetlink.RouteDel(&netlink.Route{ Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::1"), Mask: net.CIDRMask(128, 128)}, @@ -1141,12 +1117,6 @@ func TestDeleteLoadBalancer(t *testing.T) { Scope: netlink.SCOPE_UNIVERSE, LinkIndex: 10, }) - mockNetlink.RouteDel(&netlink.Route{ - Dst: &net.IPNet{IP: net.ParseIP("fd00:1234:5678:dead:beaf::a"), Mask: net.CIDRMask(128, 128)}, - Gw: config.VirtualServiceIPv6, - Scope: netlink.SCOPE_UNIVERSE, - LinkIndex: 10, - }) }, }, } @@ -1161,7 +1131,7 @@ func TestDeleteLoadBalancer(t *testing.T) { } tt.expectedCalls(mockNetlink.EXPECT()) - assert.NoError(t, c.DeleteLoadBalancer(tt.externalIPs)) + assert.NoError(t, c.DeleteLoadBalancer(net.ParseIP(tt.externalIP))) }) } } diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index 10433e06961..7722774f68c 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -403,22 +403,12 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco return util.RemoveNetNatStaticMapping(antreaNatNodePort, "0.0.0.0", port, string(protocol)) } -func (c *Client) AddLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.addServiceRoute(net.ParseIP(svcIPStr)); err != nil { - return err - } - } - return nil +func (c *Client) AddLoadBalancer(externalIP net.IP) error { + return c.addServiceRoute(externalIP) } -func (c *Client) DeleteLoadBalancer(externalIPs []string) error { - for _, svcIPStr := range externalIPs { - if err := c.deleteServiceRoute(net.ParseIP(svcIPStr)); err != nil { - return err - } - } - return nil +func (c *Client) DeleteLoadBalancer(externalIP net.IP) error { + return c.deleteServiceRoute(externalIP) } func (c *Client) AddLocalAntreaFlexibleIPAMPodRule(podAddresses []net.IP) error { diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index 2f23c07ca38..ba8086898f5 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ func (mr *MockInterfaceMockRecorder) AddClusterIPRoute(arg0 interface{}) *gomock } // AddLoadBalancer mocks base method -func (m *MockInterface) AddLoadBalancer(arg0 []string) error { +func (m *MockInterface) AddLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddLoadBalancer", arg0) ret0, _ := ret[0].(error) @@ -149,7 +149,7 @@ func (mr *MockInterfaceMockRecorder) DeleteClusterIPRoute(arg0 interface{}) *gom } // DeleteLoadBalancer mocks base method -func (m *MockInterface) DeleteLoadBalancer(arg0 []string) error { +func (m *MockInterface) DeleteLoadBalancer(arg0 net.IP) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteLoadBalancer", arg0) ret0, _ := ret[0].(error)