diff --git a/go.mod b/go.mod index 3319ca98d7f..bf43ba6f92e 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0 github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd + github.com/lithammer/dedent v1.1.0 github.com/mdlayher/arp v0.0.0-20191213142603-f72070a231fc github.com/mdlayher/ethernet v0.0.0-20190606142754-0394541c37b7 github.com/mdlayher/ndp v0.0.0-20210831201139-f982b8766fb5 diff --git a/go.sum b/go.sum index c8c0d45da63..2065fb93a87 100644 --- a/go.sum +++ b/go.sum @@ -592,6 +592,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= +github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/pkg/agent/proxy/endpoints.go b/pkg/agent/proxy/endpoints.go index 152ac6b0a3b..3225e9a9eab 100644 --- a/pkg/agent/proxy/endpoints.go +++ b/pkg/agent/proxy/endpoints.go @@ -200,6 +200,7 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp ei := types.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ Endpoint: net.JoinHostPort(addr.IP, fmt.Sprint(port.Port)), IsLocal: isLocal, + Ready: true, }) endpointsMap[svcPortName][ei.String()] = ei } @@ -208,14 +209,20 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp return endpointsMap } -// Update updates an EndpointsMap based on current changes. -func (t *endpointsChangesTracker) Update(em types.EndpointsMap) { +// Update updates an EndpointsMap and numLocalEndpoints based on current changes. +func (t *endpointsChangesTracker) Update(em types.EndpointsMap, numLocalEndpoints map[apimachinerytypes.NamespacedName]int) { for _, change := range t.checkoutChanges() { for spn := range change.previous { delete(em, spn) + delete(numLocalEndpoints, spn.NamespacedName) } for spn, endpoints := range change.current { em[spn] = endpoints + for _, endpoint := range endpoints { + if endpoint.GetIsLocal() && endpoint.IsReady() { + numLocalEndpoints[spn.NamespacedName] += 1 + } + } } } } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2c22baeb7b7..dba951d1cad 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" + apimachinerytypes "k8s.io/apimachinery/pkg/types" k8sapitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" @@ -43,6 +44,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" k8sproxy "antrea.io/antrea/third_party/proxy" "antrea.io/antrea/third_party/proxy/config" + "antrea.io/antrea/third_party/proxy/healthcheck" ) const ( @@ -108,6 +110,9 @@ type proxier struct { // oversizeServiceSet records the Services that have more than 800 Endpoints. oversizeServiceSet sets.String + serviceHealthServer healthcheck.ServiceHealthServer + numLocalEndpoints map[apimachinerytypes.NamespacedName]int + // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool syncedOnceMutex sync.RWMutex @@ -698,13 +703,22 @@ func (p *proxier) syncProxyRules() { // GetServiceFlowKeys(). p.serviceEndpointsMapsMutex.Lock() defer p.serviceEndpointsMapsMutex.Unlock() - p.endpointsChanges.Update(p.endpointsMap) - p.serviceChanges.Update(p.serviceMap) + p.endpointsChanges.Update(p.endpointsMap, p.numLocalEndpoints) + serviceUpdateResult := p.serviceChanges.Update(p.serviceMap) p.removeStaleServices() p.installServices() p.removeStaleEndpoints() + if p.serviceHealthServer != nil { + if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + klog.ErrorS(err, "Error syncing healthcheck Services") + } + if err := p.serviceHealthServer.SyncEndpoints(p.numLocalEndpoints); err != nil { + klog.ErrorS(err, "Error syncing healthcheck Endpoints") + } + } + counter := 0 for _, endpoints := range p.endpointsMap { counter += len(endpoints) @@ -976,6 +990,15 @@ func NewProxier( ipFamily = corev1.IPv6Protocol } + var serviceHealthServer healthcheck.ServiceHealthServer + if proxyAllEnabled { + nodePortAddressesString := make([]string, len(nodePortAddresses)) + for i, address := range nodePortAddresses { + nodePortAddressesString[i] = address.String() + } + serviceHealthServer = healthcheck.NewServiceHealthServer(hostname, nil, nodePortAddressesString) + } + p := &proxier{ endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), @@ -999,6 +1022,8 @@ func NewProxier( topologyAwareHintsEnabled: topologyAwareHintsEnabled, proxyLoadBalancerIPs: proxyLoadBalancerIPs, hostname: hostname, + serviceHealthServer: serviceHealthServer, + numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{}, } p.serviceConfig.RegisterEventHandler(p) diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 78f17b93fd5..acc769d93f7 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -154,6 +154,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP nodePortAddresses: nodePortAddresses, proxyAll: o.proxyAllEnabled, proxyLoadBalancerIPs: o.proxyLoadBalancerIPs, + numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{}, } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) return p @@ -215,10 +216,10 @@ func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6, node allEps := append(extraEps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, allEps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} expectedAllEps := expectedLocalEps if !nodeLocalInternal { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) } bindingProtocol := binding.ProtocolTCP @@ -307,10 +308,10 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep eps := []*corev1.Endpoints{makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)} makeEndpointsMap(fp, eps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} expectedAllEps := expectedLocalEps if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) } bindingProtocol := binding.ProtocolTCP @@ -430,10 +431,10 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, eps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} expectedAllEps := expectedLocalEps if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) } bindingProtocol := binding.ProtocolTCP diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 3df8e61d90c..f8feefe9e8f 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -81,6 +81,12 @@ func probeFromNode(node string, url string, data *TestData) error { return err } +func probeHealthFromNode(node string, baseUrl string, data *TestData) (string, string, error) { + url := fmt.Sprintf("%s/%s", baseUrl, "healthz") + _, stdout, stderr, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url)) + return stdout, stderr, err +} + func probeHostnameFromNode(node string, baseUrl string, data *TestData) (string, error) { url := fmt.Sprintf("%s/%s", baseUrl, "hostname") _, hostname, _, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url)) @@ -191,9 +197,26 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { // of another one is Local. _, err = data.createAgnhostLoadBalancerService("agnhost-cluster", true, false, clusterIngressIP, &ipProtocol) require.NoError(t, err) - _, err = data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol) + svc, err := data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol) require.NoError(t, err) + // For the 'Local' externalTrafficPolicy, setup the health checks. + healthPort := fmt.Sprint(svc.Spec.HealthCheckNodePort) + require.NotEqual(t, "", healthPort, "HealthCheckNodePort port number should not be empty") + nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)} + var healthUrls []string + for _, nodeIP := range nodeIPs { + healthUrls = append(healthUrls, net.JoinHostPort(nodeIP, healthPort)) + } + healthOutputTmpl := `{ + "service": { + "namespace": "%s", + "name": "agnhost-local" + }, + "localEndpoints": 1 +}` + healthExpected := fmt.Sprintf(healthOutputTmpl, data.testNamespace) + port := "8080" clusterUrl := net.JoinHostPort(clusterIngressIP[0], port) localUrl := net.JoinHostPort(localIngressIP[0], port) @@ -204,7 +227,7 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { createAgnhostPod(t, data, agnhosts[idx], node, false) } t.Run("Non-HostNetwork Endpoints", func(t *testing.T) { - loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, agnhosts) + loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, agnhosts) }) // Delete agnhost Pods which are not on host network and create new agnhost Pods which are on host network. @@ -214,11 +237,11 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { createAgnhostPod(t, data, hostAgnhosts[idx], node, true) } t.Run("HostNetwork Endpoints", func(t *testing.T) { - loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, nodes) + loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, nodes) }) } -func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl string, nodes, pods, podIPs, hostnames []string) { +func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, healthExpected string, nodes, healthUrls, pods, podIPs, hostnames []string) { t.Run("ExternalTrafficPolicy:Cluster/Client:Node", func(t *testing.T) { testLoadBalancerClusterFromNode(t, data, nodes, clusterUrl) }) @@ -226,7 +249,7 @@ func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl st testLoadBalancerClusterFromPod(t, data, pods, clusterUrl) }) t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) { - testLoadBalancerLocalFromNode(t, data, nodes, localUrl, hostnames) + testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl, hostnames) }) t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) { testLoadBalancerLocalFromPod(t, data, pods, localUrl, podIPs, hostnames) @@ -246,12 +269,18 @@ func testLoadBalancerClusterFromPod(t *testing.T, data *TestData, pods []string, } } -func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes []string, url string, expectedHostnames []string) { +func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string, expectedHostnames []string) { skipIfKubeProxyEnabled(t, data) for idx, node := range nodes { hostname, err := probeHostnameFromNode(node, url, data) require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node") require.Equal(t, hostname, expectedHostnames[idx]) + + for _, healthUrl := range healthUrls { + healthOutput, _, err := probeHealthFromNode(node, healthUrl, data) + require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should have a response for healthcheck") + require.Equal(t, healthOutput, healthExpected) + } } }