Skip to content

Commit

Permalink
Implement service health check in Antrea-agent (#4120)
Browse files Browse the repository at this point in the history
When services are created with "externalTrafficPolicy: Local",
a "healthCheckNodePort" is created in the k8s service object.
kube-proxy in turn will listen on this port and answer queries
on http://0.0.0.0:healthCheckNodePort/healthz". In kube-proxy
replacement mode, antrea does not support this feature. This
becomes more important for Windows support as userspace
kube-proxy is being deprecated.

This commit implements this feature in Antrea and is inspired
by the same feature in upstream kube-proxy with some differences.
The predominant difference is that upstream kube-proxy goes
through all endpoints of cluster in each iteration of
endpoints:update() to find the local endpoints. This has been
changed here to only look for changed endpoints.

Signed-off-by: Guru Shetty <[email protected]>
  • Loading branch information
shettyg authored Sep 1, 2022
1 parent 460cb94 commit 7cf5277
Show file tree
Hide file tree
Showing 9 changed files with 434 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0
github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/lithammer/dedent v1.1.0
github.com/mdlayher/arp v0.0.0-20191213142603-f72070a231fc
github.com/mdlayher/ethernet v0.0.0-20190606142754-0394541c37b7
github.com/mdlayher/ndp v0.0.0-20210831201139-f982b8766fb5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
11 changes: 9 additions & 2 deletions pkg/agent/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp
ei := types.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort(addr.IP, fmt.Sprint(port.Port)),
IsLocal: isLocal,
Ready: true,
})
endpointsMap[svcPortName][ei.String()] = ei
}
Expand All @@ -210,14 +211,20 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp
return endpointsMap
}

// Update updates an EndpointsMap based on current changes.
func (t *endpointsChangesTracker) Update(em types.EndpointsMap) {
// Update updates an EndpointsMap and numLocalEndpoints based on current changes.
func (t *endpointsChangesTracker) Update(em types.EndpointsMap, numLocalEndpoints map[apimachinerytypes.NamespacedName]int) {
for _, change := range t.checkoutChanges() {
for spn := range change.previous {
delete(em, spn)
delete(numLocalEndpoints, spn.NamespacedName)
}
for spn, endpoints := range change.current {
em[spn] = endpoints
for _, endpoint := range endpoints {
if endpoint.GetIsLocal() && endpoint.IsReady() {
numLocalEndpoints[spn.NamespacedName] += 1
}
}
}
}
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
Expand All @@ -43,6 +44,7 @@ import (
binding "antrea.io/antrea/pkg/ovs/openflow"
k8sproxy "antrea.io/antrea/third_party/proxy"
"antrea.io/antrea/third_party/proxy/config"
"antrea.io/antrea/third_party/proxy/healthcheck"
)

const (
Expand Down Expand Up @@ -108,6 +110,9 @@ type proxier struct {
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

serviceHealthServer healthcheck.ServiceHealthServer
numLocalEndpoints map[apimachinerytypes.NamespacedName]int

// syncedOnce returns true if the proxier has synced rules at least once.
syncedOnce bool
syncedOnceMutex sync.RWMutex
Expand Down Expand Up @@ -698,13 +703,22 @@ func (p *proxier) syncProxyRules() {
// GetServiceFlowKeys().
p.serviceEndpointsMapsMutex.Lock()
defer p.serviceEndpointsMapsMutex.Unlock()
p.endpointsChanges.Update(p.endpointsMap)
p.serviceChanges.Update(p.serviceMap)
p.endpointsChanges.Update(p.endpointsMap, p.numLocalEndpoints)
serviceUpdateResult := p.serviceChanges.Update(p.serviceMap)

p.removeStaleServices()
p.installServices()
p.removeStaleEndpoints()

if p.serviceHealthServer != nil {
if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.ErrorS(err, "Error syncing healthcheck Services")
}
if err := p.serviceHealthServer.SyncEndpoints(p.numLocalEndpoints); err != nil {
klog.ErrorS(err, "Error syncing healthcheck Endpoints")
}
}

counter := 0
for _, endpoints := range p.endpointsMap {
counter += len(endpoints)
Expand Down Expand Up @@ -976,6 +990,15 @@ func NewProxier(
ipFamily = corev1.IPv6Protocol
}

var serviceHealthServer healthcheck.ServiceHealthServer
if proxyAllEnabled {
nodePortAddressesString := make([]string, len(nodePortAddresses))
for i, address := range nodePortAddresses {
nodePortAddressesString[i] = address.String()
}
serviceHealthServer = healthcheck.NewServiceHealthServer(hostname, nil, nodePortAddressesString)
}

p := &proxier{
endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod),
serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod),
Expand All @@ -999,6 +1022,8 @@ func NewProxier(
topologyAwareHintsEnabled: topologyAwareHintsEnabled,
proxyLoadBalancerIPs: proxyLoadBalancerIPs,
hostname: hostname,
serviceHealthServer: serviceHealthServer,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
}

p.serviceConfig.RegisterEventHandler(p)
Expand Down
13 changes: 7 additions & 6 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP
nodePortAddresses: nodePortAddresses,
proxyAll: o.proxyAllEnabled,
proxyLoadBalancerIPs: o.proxyLoadBalancerIPs,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
}
p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2)
return p
Expand Down Expand Up @@ -215,10 +216,10 @@ func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6, node
allEps := append(extraEps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc))
makeEndpointsMap(fp, allEps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !nodeLocalInternal {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down Expand Up @@ -307,10 +308,10 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
eps := []*corev1.Endpoints{makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)}
makeEndpointsMap(fp, eps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !(nodeLocalInternal && nodeLocalExternal) {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down Expand Up @@ -430,10 +431,10 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc))
makeEndpointsMap(fp, eps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !(nodeLocalInternal && nodeLocalExternal) {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down
41 changes: 35 additions & 6 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func probeFromNode(node string, url string, data *TestData) error {
return err
}

func probeHealthFromNode(node string, baseUrl string, data *TestData) (string, string, error) {
url := fmt.Sprintf("%s/%s", baseUrl, "healthz")
_, stdout, stderr, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url))
return stdout, stderr, err
}

func probeHostnameFromNode(node string, baseUrl string, data *TestData) (string, error) {
url := fmt.Sprintf("%s/%s", baseUrl, "hostname")
_, hostname, _, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url))
Expand Down Expand Up @@ -191,9 +197,26 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
// of another one is Local.
_, err = data.createAgnhostLoadBalancerService("agnhost-cluster", true, false, clusterIngressIP, &ipProtocol)
require.NoError(t, err)
_, err = data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol)
svc, err := data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol)
require.NoError(t, err)

// For the 'Local' externalTrafficPolicy, setup the health checks.
healthPort := fmt.Sprint(svc.Spec.HealthCheckNodePort)
require.NotEqual(t, "", healthPort, "HealthCheckNodePort port number should not be empty")
nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)}
var healthUrls []string
for _, nodeIP := range nodeIPs {
healthUrls = append(healthUrls, net.JoinHostPort(nodeIP, healthPort))
}
healthOutputTmpl := `{
"service": {
"namespace": "%s",
"name": "agnhost-local"
},
"localEndpoints": 1
}`
healthExpected := fmt.Sprintf(healthOutputTmpl, data.testNamespace)

port := "8080"
clusterUrl := net.JoinHostPort(clusterIngressIP[0], port)
localUrl := net.JoinHostPort(localIngressIP[0], port)
Expand All @@ -204,7 +227,7 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
createAgnhostPod(t, data, agnhosts[idx], node, false)
}
t.Run("Non-HostNetwork Endpoints", func(t *testing.T) {
loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, agnhosts)
loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, agnhosts)
})

// Delete agnhost Pods which are not on host network and create new agnhost Pods which are on host network.
Expand All @@ -214,19 +237,19 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
createAgnhostPod(t, data, hostAgnhosts[idx], node, true)
}
t.Run("HostNetwork Endpoints", func(t *testing.T) {
loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, nodes)
loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, nodes)
})
}

func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl string, nodes, pods, podIPs, hostnames []string) {
func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, healthExpected string, nodes, healthUrls, pods, podIPs, hostnames []string) {
t.Run("ExternalTrafficPolicy:Cluster/Client:Node", func(t *testing.T) {
testLoadBalancerClusterFromNode(t, data, nodes, clusterUrl)
})
t.Run("ExternalTrafficPolicy:Cluster/Client:Pod", func(t *testing.T) {
testLoadBalancerClusterFromPod(t, data, pods, clusterUrl)
})
t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) {
testLoadBalancerLocalFromNode(t, data, nodes, localUrl, hostnames)
testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl, hostnames)
})
t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) {
testLoadBalancerLocalFromPod(t, data, pods, localUrl, podIPs, hostnames)
Expand All @@ -246,12 +269,18 @@ func testLoadBalancerClusterFromPod(t *testing.T, data *TestData, pods []string,
}
}

func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes []string, url string, expectedHostnames []string) {
func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string, expectedHostnames []string) {
skipIfKubeProxyEnabled(t, data)
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, url, data)
require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node")
require.Equal(t, hostname, expectedHostnames[idx])

for _, healthUrl := range healthUrls {
healthOutput, _, err := probeHealthFromNode(node, healthUrl, data)
require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should have a response for healthcheck")
require.Equal(t, healthOutput, healthExpected)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions third_party/proxy/healthcheck/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Package proxy/healthcheck is copied from [k8s.io/kubernetes@/v1.24.4](https://github.com/kubernetes/kubernetes/tree/v1.24.4) to avoid importing the whole kubernetes repo.
63 changes: 63 additions & 0 deletions third_party/proxy/healthcheck/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthcheck

import (
"net"
"net/http"
)

// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
type listener interface {
// Listen is very much like net.Listen, except the first arg (network) is
// fixed to be "tcp".
Listen(addr string) (net.Listener, error)
}

// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
type httpServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(addr string, handler http.Handler) httpServer
}

// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
// It is designed so that http.Server satisfies this interface,
type httpServer interface {
Serve(listener net.Listener) error
}

// Implement listener in terms of net.Listen.
type stdNetListener struct{}

func (stdNetListener) Listen(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}

var _ listener = stdNetListener{}

// Implement httpServerFactory in terms of http.Server.
type stdHTTPServerFactory struct{}

func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
return &http.Server{
Addr: addr,
Handler: handler,
}
}

var _ httpServerFactory = stdHTTPServerFactory{}
Loading

0 comments on commit 7cf5277

Please sign in to comment.