diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index dc15f7ad5db..bbae65f3b66 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -169,7 +169,7 @@ func (p *proxier) removeStaleServices() { if svcInfo.NodeLocalExternal() { groupIDLocal, _ := p.groupCounter.Get(svcPortName, true) if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) continue } p.groupCounter.Recycle(svcPortName, true) @@ -177,7 +177,7 @@ func (p *proxier) removeStaleServices() { // Remove Service group which has all Endpoints. groupID, _ := p.groupCounter.Get(svcPortName, false) if err := p.ofClient.UninstallServiceGroup(groupID); err != nil { - klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName) + klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName) continue } @@ -369,7 +369,7 @@ func (p *proxier) installServices() { pSvcInfo = installedSvcPort.(*types.ServiceInfo) needRemoval = serviceIdentityChanged(svcInfo, pSvcInfo) || (svcInfo.SessionAffinityType() != pSvcInfo.SessionAffinityType()) needUpdateService = needRemoval || (svcInfo.StickyMaxAgeSeconds() != pSvcInfo.StickyMaxAgeSeconds()) - needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() + needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() || pSvcInfo.NodeLocalExternal() != svcInfo.NodeLocalExternal() } else { // Need to install. needUpdateService = true } @@ -449,19 +449,32 @@ func (p *proxier) installServices() { continue } - // Install another group when Service externalTrafficPolicy is Local. - if p.proxyAll && svcInfo.NodeLocalExternal() { - groupIDLocal, _ := p.groupCounter.Get(svcPortName, true) - var localEndpointList []k8sproxy.Endpoint - for _, ed := range endpointUpdateList { - if !ed.GetIsLocal() { + if p.proxyAll { + if svcInfo.NodeLocalExternal() { + // Install another group when Service externalTrafficPolicy is Local. + groupIDLocal, _ := p.groupCounter.Get(svcPortName, true) + var localEndpointList []k8sproxy.Endpoint + for _, ed := range endpointUpdateList { + if !ed.GetIsLocal() { + continue + } + localEndpointList = append(localEndpointList, ed) + } + if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil { + klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local") continue } - localEndpointList = append(localEndpointList, ed) - } - if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil { - klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local") - continue + } else { + // Uninstall the group with only local Endpoints when Service externalTrafficPolicy is Cluster + // unconditionally. If the group doesn't exist on OVS, then the return value will be nil; if the + // group exists on OVS, and after it is uninstalled successfully, then the return value will be also + // nil. + groupIDLocal, _ := p.groupCounter.Get(svcPortName, true) + if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil { + klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName) + continue + } + p.groupCounter.Recycle(svcPortName, true) } } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index baf0bcc8515..ea636f79bd5 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -179,6 +179,7 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool, extraSv mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) fp.syncProxyRules() } @@ -263,6 +264,8 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep if nodeLocalExternal { groupID, _ = fp.groupCounter.Get(svcPortName, true) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + } else { + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) } mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeLoadBalancer).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1) @@ -351,6 +354,8 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP if nodeLocalExternal { groupID, _ = fp.groupCounter.Get(svcPortName, true) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + } else { + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) } mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1) @@ -585,8 +590,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) - + mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2) fp.syncProxyRules() fp.serviceChanges.OnServiceUpdate(service, nil) diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 3b9c751169e..542c625a52a 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1507,12 +1507,27 @@ func (data *TestData) createAgnhostNodePortService(serviceName string, affinity, return data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily) } +func (data *TestData) updateServiceExternalTrafficPolicy(serviceName string, nodeLocalExternal bool) (*corev1.Service, error) { + svc, err := data.clientset.CoreV1().Services(testNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{}) + if err != nil { + return svc, err + } + if nodeLocalExternal { + svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal + } else { + svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeCluster + } + + return data.clientset.CoreV1().Services(testNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{}) +} + // createAgnhostLoadBalancerService creates a LoadBalancer agnhost service with the given name. func (data *TestData) createAgnhostLoadBalancerService(serviceName string, affinity, nodeLocalExternal bool, ingressIPs []string, ipFamily *corev1.IPFamily) (*corev1.Service, error) { svc, err := data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, ipFamily) if err != nil { return svc, err } + ingress := make([]corev1.LoadBalancerIngress, len(ingressIPs)) for idx, ingressIP := range ingressIPs { ingress[idx].IP = ingressIP diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 3e0c9567573..aef107a2512 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -136,6 +136,14 @@ func probeClientIPFromPod(data *TestData, pod string, baseUrl string) (string, e return host, err } +func reverseStrs(strs []string) []string { + var res []string + for i := len(strs) - 1; i >= 0; i-- { + res = append(res, strs[i]) + } + return res +} + func TestProxyLoadBalancerServiceIPv4(t *testing.T) { skipIfNotIPv4Cluster(t) testProxyLoadBalancerService(t, false) @@ -349,13 +357,6 @@ func nodePortTestCases(t *testing.T, data *TestData, portStrCluster, portStrLoca clusterUrls = append(clusterUrls, net.JoinHostPort(nodeIP, portStrCluster)) localUrls = append(localUrls, net.JoinHostPort(nodeIP, portStrLocal)) } - reverseStrs := func(strs []string) []string { - var res []string - for i := len(strs) - 1; i >= 0; i-- { - res = append(res, strs[i]) - } - return res - } t.Run("ExternalTrafficPolicy:Cluster/Client:Remote", func(t *testing.T) { testNodePortClusterFromRemote(t, data, nodes, reverseStrs(clusterUrls)) @@ -424,11 +425,11 @@ func testNodePortLocalFromRemote(t *testing.T, data *TestData, nodes, urls, expe for idx, node := range nodes { hostname, err := probeHostnameFromNode(node, urls[idx]) require.NoError(t, err, errMsg) - require.Equal(t, hostname, expectedHostnames[idx]) + require.Equal(t, expectedHostnames[idx], hostname) clientIP, err := probeClientIPFromNode(node, urls[idx]) require.NoError(t, err, errMsg) - require.Equal(t, clientIP, expectedClientIPs[idx]) + require.Equal(t, expectedClientIPs[idx], clientIP) } } @@ -437,7 +438,7 @@ func testNodePortLocalFromNode(t *testing.T, data *TestData, nodes, urls, expect for idx, node := range nodes { hostname, err := probeHostnameFromNode(node, urls[idx]) require.NoError(t, err, "Service NodePort whose externalTrafficPolicy is Local should be able to be connected rom Node") - require.Equal(t, hostname, expectedHostnames[idx]) + require.Equal(t, expectedHostnames[idx], hostname) } } @@ -446,11 +447,11 @@ func testNodePortLocalFromPod(t *testing.T, data *TestData, pods, urls, expected for idx, pod := range pods { hostname, err := probeHostnameFromPod(data, pod, urls[idx]) require.NoError(t, err, errMsg) - require.Equal(t, hostname, expectedHostnames[idx]) + require.Equal(t, expectedHostnames[idx], hostname) clientIP, err := probeClientIPFromPod(data, pod, urls[idx]) require.NoError(t, err, errMsg) - require.Equal(t, clientIP, expectedClientIPs[idx]) + require.Equal(t, expectedClientIPs[idx], clientIP) } } @@ -474,6 +475,75 @@ func TestProxyServiceSessionAffinity(t *testing.T) { } } +func TestProxyExternalTrafficPolicyIPv4(t *testing.T) { + skipIfNotIPv4Cluster(t) + testProxyExternalTrafficPolicy(t, false) +} + +func TestProxyExternalTrafficPolicyIPv6(t *testing.T) { + skipIfNotIPv6Cluster(t) + testProxyExternalTrafficPolicy(t, true) +} + +func testProxyExternalTrafficPolicy(t *testing.T, isIPv6 bool) { + skipIfHasWindowsNodes(t) + skipIfNumNodesLessThan(t, 2) + skipIfProxyDisabled(t) + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + skipIfProxyAllDisabled(t, data) + + svcName := fmt.Sprintf("nodeport-external-traffic-policy-test-ipv6-%v", isIPv6) + nodes := []string{nodeName(0), nodeName(1)} + nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)} + ipProtocol := corev1.IPv4Protocol + if isIPv6 { + nodeIPs = []string{controlPlaneNodeIPv6(), workerNodeIPv6(1)} + ipProtocol = corev1.IPv6Protocol + } + + // Create agnhost Pods which are not on host network. + var podNames []string + for idx, node := range nodes { + podName := fmt.Sprintf("agnhost-%d-ipv6-%v", idx, isIPv6) + createAgnhostPod(t, data, podName, node, false) + podNames = append(podNames, podName) + } + + // Create a NodePort Service whose externalTrafficPolicy is Cluster and backend Pods are created above. + var portStr string + nodePortSvc, err := data.createAgnhostNodePortService(svcName, false, false, &ipProtocol) + require.NoError(t, err) + for _, port := range nodePortSvc.Spec.Ports { + if port.NodePort != 0 { + portStr = fmt.Sprint(port.NodePort) + break + } + } + require.NotEqual(t, "", portStr, "NodePort port number should not be empty") + + // Get test NodePort URLs. + var urls []string + for _, nodeIP := range nodeIPs { + urls = append(urls, net.JoinHostPort(nodeIP, portStr)) + } + + // Hold on to make sure that the Service is realized, then test the NodePort on each Node. + time.Sleep(2 * time.Second) + testNodePortClusterFromRemote(t, data, nodes, reverseStrs(urls)) + + // Update the NodePort Service's externalTrafficPolicy from Cluster to Local. + _, err = data.updateServiceExternalTrafficPolicy(svcName, true) + require.NoError(t, err) + + // Hold on to make sure that the update of Service is realized, then test the NodePort on each Node. + time.Sleep(2 * time.Second) + testNodePortLocalFromRemote(t, data, nodes, reverseStrs(urls), nodeIPs, reverseStrs(podNames)) +} + func testProxyServiceSessionAffinity(ipFamily *corev1.IPFamily, ingressIPs []string, data *TestData, t *testing.T) { nodeName := nodeName(1) nginx := randName("nginx-") @@ -523,6 +593,7 @@ func testProxyServiceSessionAffinity(ipFamily *corev1.IPFamily, ingressIPs []str } } } + func testProxyHairpinCase(t *testing.T, data *TestData) { if len(clusterInfo.podV4NetworkCIDR) != 0 { ipFamily := corev1.IPv4Protocol