From e325ad259bb69b5d87966969108c2023c3a83485 Mon Sep 17 00:00:00 2001 From: Guru Shetty Date: Thu, 25 Aug 2022 17:37:19 +0000 Subject: [PATCH 1/2] Import service health code from upstream 1.24.4 tree. These APIs will be used in an upcoming commit to provide service health check (to support externalTrafficPolicy: Local for a k8s service. Signed-off-by: Guru Shetty --- third_party/proxy/healthcheck/README | 1 + third_party/proxy/healthcheck/common.go | 63 ++++ .../proxy/healthcheck/service_health.go | 290 ++++++++++++++++++ 3 files changed, 354 insertions(+) create mode 100644 third_party/proxy/healthcheck/README create mode 100644 third_party/proxy/healthcheck/common.go create mode 100644 third_party/proxy/healthcheck/service_health.go diff --git a/third_party/proxy/healthcheck/README b/third_party/proxy/healthcheck/README new file mode 100644 index 00000000000..96ef2d313de --- /dev/null +++ b/third_party/proxy/healthcheck/README @@ -0,0 +1 @@ +Package proxy/healthcheck is copied from [k8s.io/kubernetes@/v1.24.4](https://github.com/kubernetes/kubernetes/tree/v1.24.4) to avoid importing the whole kubernetes repo. diff --git a/third_party/proxy/healthcheck/common.go b/third_party/proxy/healthcheck/common.go new file mode 100644 index 00000000000..f65cf1666e8 --- /dev/null +++ b/third_party/proxy/healthcheck/common.go @@ -0,0 +1,63 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "net" + "net/http" +) + +// listener allows for testing of ServiceHealthServer and ProxierHealthServer. +type listener interface { + // Listen is very much like net.Listen, except the first arg (network) is + // fixed to be "tcp". + Listen(addr string) (net.Listener, error) +} + +// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer. +type httpServerFactory interface { + // New creates an instance of a type satisfying HTTPServer. This is + // designed to include http.Server. + New(addr string, handler http.Handler) httpServer +} + +// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer. +// It is designed so that http.Server satisfies this interface, +type httpServer interface { + Serve(listener net.Listener) error +} + +// Implement listener in terms of net.Listen. +type stdNetListener struct{} + +func (stdNetListener) Listen(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) +} + +var _ listener = stdNetListener{} + +// Implement httpServerFactory in terms of http.Server. +type stdHTTPServerFactory struct{} + +func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer { + return &http.Server{ + Addr: addr, + Handler: handler, + } +} + +var _ httpServerFactory = stdHTTPServerFactory{} diff --git a/third_party/proxy/healthcheck/service_health.go b/third_party/proxy/healthcheck/service_health.go new file mode 100644 index 00000000000..61ccaf09521 --- /dev/null +++ b/third_party/proxy/healthcheck/service_health.go @@ -0,0 +1,290 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +/* +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +Modifies: +- Replaces api.EventTypeWarning with string "Warning" +- Replaces import of util package from kubernetes path to antrea/third_party +- Modify newServiceHealthServer() to remove calls to GetNodeAddresses and +checking of ZeroCIDR. This is not needed as Antrea passes actual IP addresses +of interfaces. Also change listenAndServeAll() to directly use nodeAddresses +instead of a set that is returned by GetNodeAddresses(). +*/ + +package healthcheck + +import ( + "fmt" + "net" + "net/http" + "strings" + "sync" + + "github.com/lithammer/dedent" + v1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/events" + + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// ServiceHealthServer serves HTTP endpoints for each service name, with results +// based on the endpoints. If there are 0 endpoints for a service, it returns a +// 503 "Service Unavailable" error (telling LBs not to use this node). If there +// are 1 or more endpoints, it returns a 200 "OK". +type ServiceHealthServer interface { + // Make the new set of services be active. Services that were open before + // will be closed. Services that are new will be opened. Service that + // existed and are in the new set will be left alone. The value of the map + // is the healthcheck-port to listen on. + SyncServices(newServices map[types.NamespacedName]uint16) error + // Make the new set of endpoints be active. Endpoints for services that do + // not exist will be dropped. The value of the map is the number of + // endpoints the service has on this node. + SyncEndpoints(newEndpoints map[types.NamespacedName]int) error +} + +func newServiceHealthServer(hostname string, recorder events.EventRecorder, listener listener, factory httpServerFactory, nodePortAddresses []string) ServiceHealthServer { + + return &server{ + hostname: hostname, + recorder: recorder, + listener: listener, + httpFactory: factory, + services: map[types.NamespacedName]*hcInstance{}, + nodeAddresses: nodePortAddresses, + } +} + +// NewServiceHealthServer allocates a new service healthcheck server manager +func NewServiceHealthServer(hostname string, recorder events.EventRecorder, nodePortAddresses []string) ServiceHealthServer { + return newServiceHealthServer(hostname, recorder, stdNetListener{}, stdHTTPServerFactory{}, nodePortAddresses) +} + +type server struct { + hostname string + // node addresses where health check port will listen on + nodeAddresses []string + recorder events.EventRecorder // can be nil + listener listener + httpFactory httpServerFactory + + lock sync.RWMutex + services map[types.NamespacedName]*hcInstance +} + +func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + // Remove any that are not needed any more. + for nsn, svc := range hcs.services { + if port, found := newServices[nsn]; !found || port != svc.port { + klog.V(2).InfoS("Closing healthcheck", "service", nsn, "port", svc.port) + + // errors are loged in closeAll() + _ = svc.closeAll() + + delete(hcs.services, nsn) + + } + } + + // Add any that are needed. + for nsn, port := range newServices { + if hcs.services[nsn] != nil { + klog.V(3).InfoS("Existing healthcheck", "service", nsn, "port", port) + continue + } + + klog.V(2).InfoS("Opening healthcheck", "service", nsn, "port", port) + + svc := &hcInstance{nsn: nsn, port: port} + err := svc.listenAndServeAll(hcs) + + if err != nil { + msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) + + if hcs.recorder != nil { + hcs.recorder.Eventf( + &v1.ObjectReference{ + Kind: "Service", + Namespace: nsn.Namespace, + Name: nsn.Name, + UID: types.UID(nsn.String()), + }, nil, "Warning", "FailedToStartServiceHealthcheck", "Listen", msg) + } + klog.ErrorS(err, "Failed to start healthcheck", "node", hcs.hostname, "service", nsn, "port", port) + continue + } + hcs.services[nsn] = svc + } + return nil +} + +type hcInstance struct { + nsn types.NamespacedName + port uint16 + + listeners []net.Listener + httpServers []httpServer + + endpoints int // number of local endpoints for a service +} + +// listenAll opens health check port on all the addresses provided +func (hcI *hcInstance) listenAndServeAll(hcs *server) error { + var err error + var listener net.Listener + + hcI.listeners = make([]net.Listener, 0, len(hcs.nodeAddresses)) + hcI.httpServers = make([]httpServer, 0, len(hcs.nodeAddresses)) + + // for each of the node addresses start listening and serving + for _, address := range hcs.nodeAddresses { + addr := net.JoinHostPort(address, fmt.Sprint(hcI.port)) + // create http server + httpSrv := hcs.httpFactory.New(addr, hcHandler{name: hcI.nsn, hcs: hcs}) + // start listener + listener, err = hcs.listener.Listen(addr) + if err != nil { + // must close whatever have been previously opened + // to allow a retry/or port ownership change as needed + _ = hcI.closeAll() + return err + } + + // start serving + go func(hcI *hcInstance, listener net.Listener, httpSrv httpServer) { + // Serve() will exit when the listener is closed. + klog.V(3).InfoS("Starting goroutine for healthcheck", "service", hcI.nsn, "address", listener.Addr()) + if err := httpSrv.Serve(listener); err != nil { + klog.ErrorS(err, "Healthcheck closed", "service", hcI.nsn) + return + } + klog.V(3).InfoS("Healthcheck closed", "service", hcI.nsn, "address", listener.Addr()) + }(hcI, listener, httpSrv) + + hcI.listeners = append(hcI.listeners, listener) + hcI.httpServers = append(hcI.httpServers, httpSrv) + } + + return nil +} + +func (hcI *hcInstance) closeAll() error { + errors := []error{} + for _, listener := range hcI.listeners { + if err := listener.Close(); err != nil { + klog.ErrorS(err, "Error closing listener for health check service", "service", hcI.nsn, "address", listener.Addr()) + errors = append(errors, err) + } + } + + if len(errors) > 0 { + return utilerrors.NewAggregate(errors) + } + + return nil +} + +type hcHandler struct { + name types.NamespacedName + hcs *server +} + +var _ http.Handler = hcHandler{} + +func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + h.hcs.lock.RLock() + svc, ok := h.hcs.services[h.name] + if !ok || svc == nil { + h.hcs.lock.RUnlock() + klog.ErrorS(nil, "Received request for closed healthcheck", "service", h.name) + return + } + count := svc.endpoints + h.hcs.lock.RUnlock() + + resp.Header().Set("Content-Type", "application/json") + resp.Header().Set("X-Content-Type-Options", "nosniff") + if count == 0 { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprint(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(` + { + "service": { + "namespace": %q, + "name": %q + }, + "localEndpoints": %d + } + `, h.name.Namespace, h.name.Name, count)), "\n")) +} + +func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + for nsn, count := range newEndpoints { + if hcs.services[nsn] == nil { + continue + } + klog.V(3).InfoS("Reporting endpoints for healthcheck", "endpointCount", count, "service", nsn) + hcs.services[nsn].endpoints = count + } + for nsn, hci := range hcs.services { + if _, found := newEndpoints[nsn]; !found { + hci.endpoints = 0 + } + } + return nil +} + +// FakeServiceHealthServer is a fake ServiceHealthServer for test programs +type FakeServiceHealthServer struct{} + +// NewFakeServiceHealthServer allocates a new fake service healthcheck server manager +func NewFakeServiceHealthServer() ServiceHealthServer { + return FakeServiceHealthServer{} +} + +// SyncServices is part of ServiceHealthServer +func (fake FakeServiceHealthServer) SyncServices(_ map[types.NamespacedName]uint16) error { + return nil +} + +// SyncEndpoints is part of ServiceHealthServer +func (fake FakeServiceHealthServer) SyncEndpoints(_ map[types.NamespacedName]int) error { + return nil +} From faa2f6b16fa91a3e691d5d3eb7332ed5c69685d4 Mon Sep 17 00:00:00 2001 From: Guru Shetty Date: Tue, 16 Aug 2022 16:10:56 +0000 Subject: [PATCH 2/2] antrea-agent: Implement service health check. When services are created with "externalTrafficPolicy: Local", a "healthCheckNodePort" is created in the k8s service object. kube-proxy in turn will listen on this port and answer queries on http://0.0.0.0:healthCheckNodePort/healthz". In kube-proxy replacement mode, antrea does not support this feature. This becomes more important for Windows support as userspace kube-proxy is being deprecated. This commit implements this feature in Antrea and is inspired by the same feature in upstream kube-proxy with some differences. The predominant difference is that upstream kube-proxy goes through all endpoints of cluster in each iteration of endpoints:update() to find the local endpoints. This has been changed here to only look for changed endpoints. Signed-off-by: Guru Shetty --- go.mod | 1 + go.sum | 1 + pkg/agent/proxy/endpoints.go | 11 +++++++-- pkg/agent/proxy/proxier.go | 29 +++++++++++++++++++++-- pkg/agent/proxy/proxier_test.go | 13 ++++++----- test/e2e/proxy_test.go | 41 ++++++++++++++++++++++++++++----- 6 files changed, 80 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index 3319ca98d7f..bf43ba6f92e 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0 github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd + github.com/lithammer/dedent v1.1.0 github.com/mdlayher/arp v0.0.0-20191213142603-f72070a231fc github.com/mdlayher/ethernet v0.0.0-20190606142754-0394541c37b7 github.com/mdlayher/ndp v0.0.0-20210831201139-f982b8766fb5 diff --git a/go.sum b/go.sum index c8c0d45da63..2065fb93a87 100644 --- a/go.sum +++ b/go.sum @@ -592,6 +592,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= +github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY= github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/pkg/agent/proxy/endpoints.go b/pkg/agent/proxy/endpoints.go index 152ac6b0a3b..3225e9a9eab 100644 --- a/pkg/agent/proxy/endpoints.go +++ b/pkg/agent/proxy/endpoints.go @@ -200,6 +200,7 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp ei := types.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{ Endpoint: net.JoinHostPort(addr.IP, fmt.Sprint(port.Port)), IsLocal: isLocal, + Ready: true, }) endpointsMap[svcPortName][ei.String()] = ei } @@ -208,14 +209,20 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp return endpointsMap } -// Update updates an EndpointsMap based on current changes. -func (t *endpointsChangesTracker) Update(em types.EndpointsMap) { +// Update updates an EndpointsMap and numLocalEndpoints based on current changes. +func (t *endpointsChangesTracker) Update(em types.EndpointsMap, numLocalEndpoints map[apimachinerytypes.NamespacedName]int) { for _, change := range t.checkoutChanges() { for spn := range change.previous { delete(em, spn) + delete(numLocalEndpoints, spn.NamespacedName) } for spn, endpoints := range change.current { em[spn] = endpoints + for _, endpoint := range endpoints { + if endpoint.GetIsLocal() && endpoint.IsReady() { + numLocalEndpoints[spn.NamespacedName] += 1 + } + } } } } diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2c22baeb7b7..dba951d1cad 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" + apimachinerytypes "k8s.io/apimachinery/pkg/types" k8sapitypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" @@ -43,6 +44,7 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" k8sproxy "antrea.io/antrea/third_party/proxy" "antrea.io/antrea/third_party/proxy/config" + "antrea.io/antrea/third_party/proxy/healthcheck" ) const ( @@ -108,6 +110,9 @@ type proxier struct { // oversizeServiceSet records the Services that have more than 800 Endpoints. oversizeServiceSet sets.String + serviceHealthServer healthcheck.ServiceHealthServer + numLocalEndpoints map[apimachinerytypes.NamespacedName]int + // syncedOnce returns true if the proxier has synced rules at least once. syncedOnce bool syncedOnceMutex sync.RWMutex @@ -698,13 +703,22 @@ func (p *proxier) syncProxyRules() { // GetServiceFlowKeys(). p.serviceEndpointsMapsMutex.Lock() defer p.serviceEndpointsMapsMutex.Unlock() - p.endpointsChanges.Update(p.endpointsMap) - p.serviceChanges.Update(p.serviceMap) + p.endpointsChanges.Update(p.endpointsMap, p.numLocalEndpoints) + serviceUpdateResult := p.serviceChanges.Update(p.serviceMap) p.removeStaleServices() p.installServices() p.removeStaleEndpoints() + if p.serviceHealthServer != nil { + if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil { + klog.ErrorS(err, "Error syncing healthcheck Services") + } + if err := p.serviceHealthServer.SyncEndpoints(p.numLocalEndpoints); err != nil { + klog.ErrorS(err, "Error syncing healthcheck Endpoints") + } + } + counter := 0 for _, endpoints := range p.endpointsMap { counter += len(endpoints) @@ -976,6 +990,15 @@ func NewProxier( ipFamily = corev1.IPv6Protocol } + var serviceHealthServer healthcheck.ServiceHealthServer + if proxyAllEnabled { + nodePortAddressesString := make([]string, len(nodePortAddresses)) + for i, address := range nodePortAddresses { + nodePortAddressesString[i] = address.String() + } + serviceHealthServer = healthcheck.NewServiceHealthServer(hostname, nil, nodePortAddressesString) + } + p := &proxier{ endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod), serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod), @@ -999,6 +1022,8 @@ func NewProxier( topologyAwareHintsEnabled: topologyAwareHintsEnabled, proxyLoadBalancerIPs: proxyLoadBalancerIPs, hostname: hostname, + serviceHealthServer: serviceHealthServer, + numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{}, } p.serviceConfig.RegisterEventHandler(p) diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 78f17b93fd5..acc769d93f7 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -154,6 +154,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP nodePortAddresses: nodePortAddresses, proxyAll: o.proxyAllEnabled, proxyLoadBalancerIPs: o.proxyLoadBalancerIPs, + numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{}, } p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2) return p @@ -215,10 +216,10 @@ func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6, node allEps := append(extraEps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, allEps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} expectedAllEps := expectedLocalEps if !nodeLocalInternal { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) } bindingProtocol := binding.ProtocolTCP @@ -307,10 +308,10 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep eps := []*corev1.Endpoints{makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)} makeEndpointsMap(fp, eps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} expectedAllEps := expectedLocalEps if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) } bindingProtocol := binding.ProtocolTCP @@ -430,10 +431,10 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)) makeEndpointsMap(fp, eps...) - expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)} + expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)} expectedAllEps := expectedLocalEps if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil)) + expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil)) } bindingProtocol := binding.ProtocolTCP diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index 3df8e61d90c..f8feefe9e8f 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -81,6 +81,12 @@ func probeFromNode(node string, url string, data *TestData) error { return err } +func probeHealthFromNode(node string, baseUrl string, data *TestData) (string, string, error) { + url := fmt.Sprintf("%s/%s", baseUrl, "healthz") + _, stdout, stderr, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url)) + return stdout, stderr, err +} + func probeHostnameFromNode(node string, baseUrl string, data *TestData) (string, error) { url := fmt.Sprintf("%s/%s", baseUrl, "hostname") _, hostname, _, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url)) @@ -191,9 +197,26 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { // of another one is Local. _, err = data.createAgnhostLoadBalancerService("agnhost-cluster", true, false, clusterIngressIP, &ipProtocol) require.NoError(t, err) - _, err = data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol) + svc, err := data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol) require.NoError(t, err) + // For the 'Local' externalTrafficPolicy, setup the health checks. + healthPort := fmt.Sprint(svc.Spec.HealthCheckNodePort) + require.NotEqual(t, "", healthPort, "HealthCheckNodePort port number should not be empty") + nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)} + var healthUrls []string + for _, nodeIP := range nodeIPs { + healthUrls = append(healthUrls, net.JoinHostPort(nodeIP, healthPort)) + } + healthOutputTmpl := `{ + "service": { + "namespace": "%s", + "name": "agnhost-local" + }, + "localEndpoints": 1 +}` + healthExpected := fmt.Sprintf(healthOutputTmpl, data.testNamespace) + port := "8080" clusterUrl := net.JoinHostPort(clusterIngressIP[0], port) localUrl := net.JoinHostPort(localIngressIP[0], port) @@ -204,7 +227,7 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { createAgnhostPod(t, data, agnhosts[idx], node, false) } t.Run("Non-HostNetwork Endpoints", func(t *testing.T) { - loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, agnhosts) + loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, agnhosts) }) // Delete agnhost Pods which are not on host network and create new agnhost Pods which are on host network. @@ -214,11 +237,11 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) { createAgnhostPod(t, data, hostAgnhosts[idx], node, true) } t.Run("HostNetwork Endpoints", func(t *testing.T) { - loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, nodes) + loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, nodes) }) } -func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl string, nodes, pods, podIPs, hostnames []string) { +func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, healthExpected string, nodes, healthUrls, pods, podIPs, hostnames []string) { t.Run("ExternalTrafficPolicy:Cluster/Client:Node", func(t *testing.T) { testLoadBalancerClusterFromNode(t, data, nodes, clusterUrl) }) @@ -226,7 +249,7 @@ func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl st testLoadBalancerClusterFromPod(t, data, pods, clusterUrl) }) t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) { - testLoadBalancerLocalFromNode(t, data, nodes, localUrl, hostnames) + testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl, hostnames) }) t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) { testLoadBalancerLocalFromPod(t, data, pods, localUrl, podIPs, hostnames) @@ -246,12 +269,18 @@ func testLoadBalancerClusterFromPod(t *testing.T, data *TestData, pods []string, } } -func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes []string, url string, expectedHostnames []string) { +func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string, expectedHostnames []string) { skipIfKubeProxyEnabled(t, data) for idx, node := range nodes { hostname, err := probeHostnameFromNode(node, url, data) require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node") require.Equal(t, hostname, expectedHostnames[idx]) + + for _, healthUrl := range healthUrls { + healthOutput, _, err := probeHealthFromNode(node, healthUrl, data) + require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should have a response for healthcheck") + require.Equal(t, healthOutput, healthExpected) + } } }