Skip to content

Commit

Permalink
antrea-agent: Implement service health check.
Browse files Browse the repository at this point in the history
When services are created with "externalTrafficPolicy: Local",
a "healthCheckNodePort" is created in the k8s service object.
kube-proxy in turn will listen on this port and answer queries
on http://0.0.0.0:healthCheckNodePort/healthz". In kube-proxy
replacement mode, antrea does not support this feature. This
becomes more important for Windows support as userspace
kube-proxy is being deprecated.

This commit implements this feature in Antrea and is inspired
by the same feature in upstream kube-proxy with some differences.
The predominant difference is that upstream kube-proxy goes
through all endpoints of cluster in each iteration of
endpoints:update() to find the local endpoints. This has been
changed here to only look for changed endpoints.

Signed-off-by: Guru Shetty <[email protected]>
  • Loading branch information
Guru Shetty committed Aug 30, 2022
1 parent e325ad2 commit faa2f6b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 16 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0
github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/lithammer/dedent v1.1.0
github.com/mdlayher/arp v0.0.0-20191213142603-f72070a231fc
github.com/mdlayher/ethernet v0.0.0-20190606142754-0394541c37b7
github.com/mdlayher/ndp v0.0.0-20210831201139-f982b8766fb5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
11 changes: 9 additions & 2 deletions pkg/agent/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp
ei := types.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort(addr.IP, fmt.Sprint(port.Port)),
IsLocal: isLocal,
Ready: true,
})
endpointsMap[svcPortName][ei.String()] = ei
}
Expand All @@ -208,14 +209,20 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp
return endpointsMap
}

// Update updates an EndpointsMap based on current changes.
func (t *endpointsChangesTracker) Update(em types.EndpointsMap) {
// Update updates an EndpointsMap and numLocalEndpoints based on current changes.
func (t *endpointsChangesTracker) Update(em types.EndpointsMap, numLocalEndpoints map[apimachinerytypes.NamespacedName]int) {
for _, change := range t.checkoutChanges() {
for spn := range change.previous {
delete(em, spn)
delete(numLocalEndpoints, spn.NamespacedName)
}
for spn, endpoints := range change.current {
em[spn] = endpoints
for _, endpoint := range endpoints {
if endpoint.GetIsLocal() && endpoint.IsReady() {
numLocalEndpoints[spn.NamespacedName] += 1
}
}
}
}
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
Expand All @@ -43,6 +44,7 @@ import (
binding "antrea.io/antrea/pkg/ovs/openflow"
k8sproxy "antrea.io/antrea/third_party/proxy"
"antrea.io/antrea/third_party/proxy/config"
"antrea.io/antrea/third_party/proxy/healthcheck"
)

const (
Expand Down Expand Up @@ -108,6 +110,9 @@ type proxier struct {
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

serviceHealthServer healthcheck.ServiceHealthServer
numLocalEndpoints map[apimachinerytypes.NamespacedName]int

// syncedOnce returns true if the proxier has synced rules at least once.
syncedOnce bool
syncedOnceMutex sync.RWMutex
Expand Down Expand Up @@ -698,13 +703,22 @@ func (p *proxier) syncProxyRules() {
// GetServiceFlowKeys().
p.serviceEndpointsMapsMutex.Lock()
defer p.serviceEndpointsMapsMutex.Unlock()
p.endpointsChanges.Update(p.endpointsMap)
p.serviceChanges.Update(p.serviceMap)
p.endpointsChanges.Update(p.endpointsMap, p.numLocalEndpoints)
serviceUpdateResult := p.serviceChanges.Update(p.serviceMap)

p.removeStaleServices()
p.installServices()
p.removeStaleEndpoints()

if p.serviceHealthServer != nil {
if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.ErrorS(err, "Error syncing healthcheck Services")
}
if err := p.serviceHealthServer.SyncEndpoints(p.numLocalEndpoints); err != nil {
klog.ErrorS(err, "Error syncing healthcheck Endpoints")
}
}

counter := 0
for _, endpoints := range p.endpointsMap {
counter += len(endpoints)
Expand Down Expand Up @@ -976,6 +990,15 @@ func NewProxier(
ipFamily = corev1.IPv6Protocol
}

var serviceHealthServer healthcheck.ServiceHealthServer
if proxyAllEnabled {
nodePortAddressesString := make([]string, len(nodePortAddresses))
for i, address := range nodePortAddresses {
nodePortAddressesString[i] = address.String()
}
serviceHealthServer = healthcheck.NewServiceHealthServer(hostname, nil, nodePortAddressesString)
}

p := &proxier{
endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod),
serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod),
Expand All @@ -999,6 +1022,8 @@ func NewProxier(
topologyAwareHintsEnabled: topologyAwareHintsEnabled,
proxyLoadBalancerIPs: proxyLoadBalancerIPs,
hostname: hostname,
serviceHealthServer: serviceHealthServer,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
}

p.serviceConfig.RegisterEventHandler(p)
Expand Down
13 changes: 7 additions & 6 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP
nodePortAddresses: nodePortAddresses,
proxyAll: o.proxyAllEnabled,
proxyLoadBalancerIPs: o.proxyLoadBalancerIPs,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
}
p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2)
return p
Expand Down Expand Up @@ -215,10 +216,10 @@ func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6, node
allEps := append(extraEps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc))
makeEndpointsMap(fp, allEps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !nodeLocalInternal {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down Expand Up @@ -307,10 +308,10 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
eps := []*corev1.Endpoints{makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)}
makeEndpointsMap(fp, eps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !(nodeLocalInternal && nodeLocalExternal) {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down Expand Up @@ -430,10 +431,10 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc))
makeEndpointsMap(fp, eps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !(nodeLocalInternal && nodeLocalExternal) {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down
41 changes: 35 additions & 6 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func probeFromNode(node string, url string, data *TestData) error {
return err
}

func probeHealthFromNode(node string, baseUrl string, data *TestData) (string, string, error) {
url := fmt.Sprintf("%s/%s", baseUrl, "healthz")
_, stdout, stderr, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url))
return stdout, stderr, err
}

func probeHostnameFromNode(node string, baseUrl string, data *TestData) (string, error) {
url := fmt.Sprintf("%s/%s", baseUrl, "hostname")
_, hostname, _, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url))
Expand Down Expand Up @@ -191,9 +197,26 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
// of another one is Local.
_, err = data.createAgnhostLoadBalancerService("agnhost-cluster", true, false, clusterIngressIP, &ipProtocol)
require.NoError(t, err)
_, err = data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol)
svc, err := data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol)
require.NoError(t, err)

// For the 'Local' externalTrafficPolicy, setup the health checks.
healthPort := fmt.Sprint(svc.Spec.HealthCheckNodePort)
require.NotEqual(t, "", healthPort, "HealthCheckNodePort port number should not be empty")
nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)}
var healthUrls []string
for _, nodeIP := range nodeIPs {
healthUrls = append(healthUrls, net.JoinHostPort(nodeIP, healthPort))
}
healthOutputTmpl := `{
"service": {
"namespace": "%s",
"name": "agnhost-local"
},
"localEndpoints": 1
}`
healthExpected := fmt.Sprintf(healthOutputTmpl, data.testNamespace)

port := "8080"
clusterUrl := net.JoinHostPort(clusterIngressIP[0], port)
localUrl := net.JoinHostPort(localIngressIP[0], port)
Expand All @@ -204,7 +227,7 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
createAgnhostPod(t, data, agnhosts[idx], node, false)
}
t.Run("Non-HostNetwork Endpoints", func(t *testing.T) {
loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, agnhosts)
loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, agnhosts)
})

// Delete agnhost Pods which are not on host network and create new agnhost Pods which are on host network.
Expand All @@ -214,19 +237,19 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
createAgnhostPod(t, data, hostAgnhosts[idx], node, true)
}
t.Run("HostNetwork Endpoints", func(t *testing.T) {
loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, nodes)
loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, nodes)
})
}

func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl string, nodes, pods, podIPs, hostnames []string) {
func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, healthExpected string, nodes, healthUrls, pods, podIPs, hostnames []string) {
t.Run("ExternalTrafficPolicy:Cluster/Client:Node", func(t *testing.T) {
testLoadBalancerClusterFromNode(t, data, nodes, clusterUrl)
})
t.Run("ExternalTrafficPolicy:Cluster/Client:Pod", func(t *testing.T) {
testLoadBalancerClusterFromPod(t, data, pods, clusterUrl)
})
t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) {
testLoadBalancerLocalFromNode(t, data, nodes, localUrl, hostnames)
testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl, hostnames)
})
t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) {
testLoadBalancerLocalFromPod(t, data, pods, localUrl, podIPs, hostnames)
Expand All @@ -246,12 +269,18 @@ func testLoadBalancerClusterFromPod(t *testing.T, data *TestData, pods []string,
}
}

func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes []string, url string, expectedHostnames []string) {
func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string, expectedHostnames []string) {
skipIfKubeProxyEnabled(t, data)
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, url, data)
require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node")
require.Equal(t, hostname, expectedHostnames[idx])

for _, healthUrl := range healthUrls {
healthOutput, _, err := probeHealthFromNode(node, healthUrl, data)
require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should have a response for healthcheck")
require.Equal(t, healthOutput, healthExpected)
}
}
}

Expand Down

0 comments on commit faa2f6b

Please sign in to comment.