Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #3295: Fix NodePort/LoadBalancer issue when proxyAll is enabled #3329

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ func (p *proxier) removeStaleServices() {
if svcInfo.NodeLocalExternal() {
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, true)
}
// Remove Service group which has all Endpoints.
groupID, _ := p.groupCounter.Get(svcPortName, false)
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName)
continue
}

Expand Down Expand Up @@ -369,7 +369,7 @@ func (p *proxier) installServices() {
pSvcInfo = installedSvcPort.(*types.ServiceInfo)
needRemoval = serviceIdentityChanged(svcInfo, pSvcInfo) || (svcInfo.SessionAffinityType() != pSvcInfo.SessionAffinityType())
needUpdateService = needRemoval || (svcInfo.StickyMaxAgeSeconds() != pSvcInfo.StickyMaxAgeSeconds())
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType()
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() || pSvcInfo.NodeLocalExternal() != svcInfo.NodeLocalExternal()
} else { // Need to install.
needUpdateService = true
}
Expand Down Expand Up @@ -449,19 +449,32 @@ func (p *proxier) installServices() {
continue
}

// Install another group when Service externalTrafficPolicy is Local.
if p.proxyAll && svcInfo.NodeLocalExternal() {
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
var localEndpointList []k8sproxy.Endpoint
for _, ed := range endpointUpdateList {
if !ed.GetIsLocal() {
if p.proxyAll {
if svcInfo.NodeLocalExternal() {
// Install another group when Service externalTrafficPolicy is Local.
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
var localEndpointList []k8sproxy.Endpoint
for _, ed := range endpointUpdateList {
if !ed.GetIsLocal() {
continue
}
localEndpointList = append(localEndpointList, ed)
}
if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil {
klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local")
continue
}
localEndpointList = append(localEndpointList, ed)
}
if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil {
klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local")
continue
} else {
// Uninstall the group with only local Endpoints when Service externalTrafficPolicy is Cluster
// unconditionally. If the group doesn't exist on OVS, then the return value will be nil; if the
// group exists on OVS, and after it is uninstalled successfully, then the return value will be also
// nil.
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, true)
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool, extraSv
mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)

fp.syncProxyRules()
}
Expand Down Expand Up @@ -263,6 +264,8 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
if nodeLocalExternal {
groupID, _ = fp.groupCounter.Get(svcPortName, true)
mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1)
} else {
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
}
mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeLoadBalancer).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1)
Expand Down Expand Up @@ -351,6 +354,8 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
if nodeLocalExternal {
groupID, _ = fp.groupCounter.Get(svcPortName, true)
mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1)
} else {
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
}

mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1)
Expand Down Expand Up @@ -585,8 +590,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)

mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2)
fp.syncProxyRules()

fp.serviceChanges.OnServiceUpdate(service, nil)
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -1507,12 +1507,27 @@ func (data *TestData) createAgnhostNodePortService(serviceName string, affinity,
return data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

func (data *TestData) updateServiceExternalTrafficPolicy(serviceName string, nodeLocalExternal bool) (*corev1.Service, error) {
svc, err := data.clientset.CoreV1().Services(testNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
return svc, err
}
if nodeLocalExternal {
svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal
} else {
svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeCluster
}

return data.clientset.CoreV1().Services(testNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
}

// createAgnhostLoadBalancerService creates a LoadBalancer agnhost service with the given name.
func (data *TestData) createAgnhostLoadBalancerService(serviceName string, affinity, nodeLocalExternal bool, ingressIPs []string, ipFamily *corev1.IPFamily) (*corev1.Service, error) {
svc, err := data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, ipFamily)
if err != nil {
return svc, err
}

ingress := make([]corev1.LoadBalancerIngress, len(ingressIPs))
for idx, ingressIP := range ingressIPs {
ingress[idx].IP = ingressIP
Expand Down
95 changes: 83 additions & 12 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ func probeClientIPFromPod(data *TestData, pod string, baseUrl string) (string, e
return host, err
}

func reverseStrs(strs []string) []string {
var res []string
for i := len(strs) - 1; i >= 0; i-- {
res = append(res, strs[i])
}
return res
}

func TestProxyLoadBalancerServiceIPv4(t *testing.T) {
skipIfNotIPv4Cluster(t)
testProxyLoadBalancerService(t, false)
Expand Down Expand Up @@ -349,13 +357,6 @@ func nodePortTestCases(t *testing.T, data *TestData, portStrCluster, portStrLoca
clusterUrls = append(clusterUrls, net.JoinHostPort(nodeIP, portStrCluster))
localUrls = append(localUrls, net.JoinHostPort(nodeIP, portStrLocal))
}
reverseStrs := func(strs []string) []string {
var res []string
for i := len(strs) - 1; i >= 0; i-- {
res = append(res, strs[i])
}
return res
}

t.Run("ExternalTrafficPolicy:Cluster/Client:Remote", func(t *testing.T) {
testNodePortClusterFromRemote(t, data, nodes, reverseStrs(clusterUrls))
Expand Down Expand Up @@ -424,11 +425,11 @@ func testNodePortLocalFromRemote(t *testing.T, data *TestData, nodes, urls, expe
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, hostname, expectedHostnames[idx])
require.Equal(t, expectedHostnames[idx], hostname)

clientIP, err := probeClientIPFromNode(node, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, clientIP, expectedClientIPs[idx])
require.Equal(t, expectedClientIPs[idx], clientIP)
}
}

Expand All @@ -437,7 +438,7 @@ func testNodePortLocalFromNode(t *testing.T, data *TestData, nodes, urls, expect
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, urls[idx])
require.NoError(t, err, "Service NodePort whose externalTrafficPolicy is Local should be able to be connected rom Node")
require.Equal(t, hostname, expectedHostnames[idx])
require.Equal(t, expectedHostnames[idx], hostname)
}
}

Expand All @@ -446,11 +447,11 @@ func testNodePortLocalFromPod(t *testing.T, data *TestData, pods, urls, expected
for idx, pod := range pods {
hostname, err := probeHostnameFromPod(data, pod, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, hostname, expectedHostnames[idx])
require.Equal(t, expectedHostnames[idx], hostname)

clientIP, err := probeClientIPFromPod(data, pod, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, clientIP, expectedClientIPs[idx])
require.Equal(t, expectedClientIPs[idx], clientIP)
}
}

Expand All @@ -474,6 +475,75 @@ func TestProxyServiceSessionAffinity(t *testing.T) {
}
}

func TestProxyExternalTrafficPolicyIPv4(t *testing.T) {
skipIfNotIPv4Cluster(t)
testProxyExternalTrafficPolicy(t, false)
}

func TestProxyExternalTrafficPolicyIPv6(t *testing.T) {
skipIfNotIPv6Cluster(t)
testProxyExternalTrafficPolicy(t, true)
}

func testProxyExternalTrafficPolicy(t *testing.T, isIPv6 bool) {
skipIfHasWindowsNodes(t)
skipIfNumNodesLessThan(t, 2)
skipIfProxyDisabled(t)
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
skipIfProxyAllDisabled(t, data)

svcName := fmt.Sprintf("nodeport-external-traffic-policy-test-ipv6-%v", isIPv6)
nodes := []string{nodeName(0), nodeName(1)}
nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)}
ipProtocol := corev1.IPv4Protocol
if isIPv6 {
nodeIPs = []string{controlPlaneNodeIPv6(), workerNodeIPv6(1)}
ipProtocol = corev1.IPv6Protocol
}

// Create agnhost Pods which are not on host network.
var podNames []string
for idx, node := range nodes {
podName := fmt.Sprintf("agnhost-%d-ipv6-%v", idx, isIPv6)
createAgnhostPod(t, data, podName, node, false)
podNames = append(podNames, podName)
}

// Create a NodePort Service whose externalTrafficPolicy is Cluster and backend Pods are created above.
var portStr string
nodePortSvc, err := data.createAgnhostNodePortService(svcName, false, false, &ipProtocol)
require.NoError(t, err)
for _, port := range nodePortSvc.Spec.Ports {
if port.NodePort != 0 {
portStr = fmt.Sprint(port.NodePort)
break
}
}
require.NotEqual(t, "", portStr, "NodePort port number should not be empty")

// Get test NodePort URLs.
var urls []string
for _, nodeIP := range nodeIPs {
urls = append(urls, net.JoinHostPort(nodeIP, portStr))
}

// Hold on to make sure that the Service is realized, then test the NodePort on each Node.
time.Sleep(2 * time.Second)
testNodePortClusterFromRemote(t, data, nodes, reverseStrs(urls))

// Update the NodePort Service's externalTrafficPolicy from Cluster to Local.
_, err = data.updateServiceExternalTrafficPolicy(svcName, true)
require.NoError(t, err)

// Hold on to make sure that the update of Service is realized, then test the NodePort on each Node.
time.Sleep(2 * time.Second)
testNodePortLocalFromRemote(t, data, nodes, reverseStrs(urls), nodeIPs, reverseStrs(podNames))
}

func testProxyServiceSessionAffinity(ipFamily *corev1.IPFamily, ingressIPs []string, data *TestData, t *testing.T) {
nodeName := nodeName(1)
nginx := randName("nginx-")
Expand Down Expand Up @@ -523,6 +593,7 @@ func testProxyServiceSessionAffinity(ipFamily *corev1.IPFamily, ingressIPs []str
}
}
}

func testProxyHairpinCase(t *testing.T, data *TestData) {
if len(clusterInfo.podV4NetworkCIDR) != 0 {
ipFamily := corev1.IPv4Protocol
Expand Down