Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement service health check in Antrea-agent #4120

Merged
merged 2 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.1.0
github.com/k8snetworkplumbingwg/sriov-cni v2.1.0+incompatible
github.com/kevinburke/ssh_config v0.0.0-20190725054713-01f96b0aa0cd
github.com/lithammer/dedent v1.1.0
github.com/mdlayher/arp v0.0.0-20191213142603-f72070a231fc
github.com/mdlayher/ethernet v0.0.0-20190606142754-0394541c37b7
github.com/mdlayher/ndp v0.0.0-20210831201139-f982b8766fb5
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0 h1:VNzHMVCBNG1j0fh3OrsFRkVUwStdDArbgBWoPAffktY=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
11 changes: 9 additions & 2 deletions pkg/agent/proxy/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp
ei := types.NewEndpointInfo(&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort(addr.IP, fmt.Sprint(port.Port)),
IsLocal: isLocal,
Ready: true,
})
endpointsMap[svcPortName][ei.String()] = ei
}
Expand All @@ -208,14 +209,20 @@ func (t *endpointsChangesTracker) endpointsToEndpointsMap(endpoints *corev1.Endp
return endpointsMap
}

// Update updates an EndpointsMap based on current changes.
func (t *endpointsChangesTracker) Update(em types.EndpointsMap) {
// Update updates an EndpointsMap and numLocalEndpoints based on current changes.
func (t *endpointsChangesTracker) Update(em types.EndpointsMap, numLocalEndpoints map[apimachinerytypes.NamespacedName]int) {
for _, change := range t.checkoutChanges() {
for spn := range change.previous {
delete(em, spn)
delete(numLocalEndpoints, spn.NamespacedName)
}
for spn, endpoints := range change.current {
em[spn] = endpoints
for _, endpoint := range endpoints {
if endpoint.GetIsLocal() && endpoint.IsReady() {
numLocalEndpoints[spn.NamespacedName] += 1
}
}
}
}
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
k8sapitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
Expand All @@ -43,6 +44,7 @@ import (
binding "antrea.io/antrea/pkg/ovs/openflow"
k8sproxy "antrea.io/antrea/third_party/proxy"
"antrea.io/antrea/third_party/proxy/config"
"antrea.io/antrea/third_party/proxy/healthcheck"
)

const (
Expand Down Expand Up @@ -108,6 +110,9 @@ type proxier struct {
// oversizeServiceSet records the Services that have more than 800 Endpoints.
oversizeServiceSet sets.String

serviceHealthServer healthcheck.ServiceHealthServer
numLocalEndpoints map[apimachinerytypes.NamespacedName]int

// syncedOnce returns true if the proxier has synced rules at least once.
syncedOnce bool
syncedOnceMutex sync.RWMutex
Expand Down Expand Up @@ -698,13 +703,22 @@ func (p *proxier) syncProxyRules() {
// GetServiceFlowKeys().
p.serviceEndpointsMapsMutex.Lock()
defer p.serviceEndpointsMapsMutex.Unlock()
p.endpointsChanges.Update(p.endpointsMap)
p.serviceChanges.Update(p.serviceMap)
p.endpointsChanges.Update(p.endpointsMap, p.numLocalEndpoints)
serviceUpdateResult := p.serviceChanges.Update(p.serviceMap)

p.removeStaleServices()
p.installServices()
p.removeStaleEndpoints()

if p.serviceHealthServer != nil {
if err := p.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.ErrorS(err, "Error syncing healthcheck Services")
}
if err := p.serviceHealthServer.SyncEndpoints(p.numLocalEndpoints); err != nil {
klog.ErrorS(err, "Error syncing healthcheck Endpoints")
}
}

counter := 0
for _, endpoints := range p.endpointsMap {
counter += len(endpoints)
Expand Down Expand Up @@ -976,6 +990,15 @@ func NewProxier(
ipFamily = corev1.IPv6Protocol
}

var serviceHealthServer healthcheck.ServiceHealthServer
if proxyAllEnabled {
nodePortAddressesString := make([]string, len(nodePortAddresses))
for i, address := range nodePortAddresses {
nodePortAddressesString[i] = address.String()
}
serviceHealthServer = healthcheck.NewServiceHealthServer(hostname, nil, nodePortAddressesString)
}

p := &proxier{
endpointsConfig: config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), resyncPeriod),
serviceConfig: config.NewServiceConfig(informerFactory.Core().V1().Services(), resyncPeriod),
Expand All @@ -999,6 +1022,8 @@ func NewProxier(
topologyAwareHintsEnabled: topologyAwareHintsEnabled,
proxyLoadBalancerIPs: proxyLoadBalancerIPs,
hostname: hostname,
serviceHealthServer: serviceHealthServer,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
}

p.serviceConfig.RegisterEventHandler(p)
Expand Down
13 changes: 7 additions & 6 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewFakeProxier(routeClient route.Interface, ofClient openflow.Client, nodeP
nodePortAddresses: nodePortAddresses,
proxyAll: o.proxyAllEnabled,
proxyLoadBalancerIPs: o.proxyLoadBalancerIPs,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
}
p.runner = k8sproxy.NewBoundedFrequencyRunner(componentName, p.syncProxyRules, time.Second, 30*time.Second, 2)
return p
Expand Down Expand Up @@ -215,10 +216,10 @@ func testClusterIP(t *testing.T, svcIP net.IP, ep1IP, ep2IP net.IP, isIPv6, node
allEps := append(extraEps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc))
makeEndpointsMap(fp, allEps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !nodeLocalInternal {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down Expand Up @@ -307,10 +308,10 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
eps := []*corev1.Endpoints{makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc)}
makeEndpointsMap(fp, eps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !(nodeLocalInternal && nodeLocalExternal) {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down Expand Up @@ -430,10 +431,10 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
eps = append(eps, makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, epFunc))
makeEndpointsMap(fp, eps...)

expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, false, false, false, nil)}
expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), "", "", svcPort, true, true, false, false, nil)}
expectedAllEps := expectedLocalEps
if !(nodeLocalInternal && nodeLocalExternal) {
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, false, false, false, nil))
expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, false, false, nil))
}

bindingProtocol := binding.ProtocolTCP
Expand Down
41 changes: 35 additions & 6 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func probeFromNode(node string, url string, data *TestData) error {
return err
}

func probeHealthFromNode(node string, baseUrl string, data *TestData) (string, string, error) {
url := fmt.Sprintf("%s/%s", baseUrl, "healthz")
_, stdout, stderr, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url))
return stdout, stderr, err
}

func probeHostnameFromNode(node string, baseUrl string, data *TestData) (string, error) {
url := fmt.Sprintf("%s/%s", baseUrl, "hostname")
_, hostname, _, err := data.RunCommandOnNode(node, fmt.Sprintf("curl --connect-timeout 1 --retry 5 --retry-connrefused %s", url))
Expand Down Expand Up @@ -191,9 +197,26 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
// of another one is Local.
_, err = data.createAgnhostLoadBalancerService("agnhost-cluster", true, false, clusterIngressIP, &ipProtocol)
require.NoError(t, err)
_, err = data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol)
svc, err := data.createAgnhostLoadBalancerService("agnhost-local", true, true, localIngressIP, &ipProtocol)
require.NoError(t, err)

// For the 'Local' externalTrafficPolicy, setup the health checks.
healthPort := fmt.Sprint(svc.Spec.HealthCheckNodePort)
require.NotEqual(t, "", healthPort, "HealthCheckNodePort port number should not be empty")
nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)}
var healthUrls []string
for _, nodeIP := range nodeIPs {
healthUrls = append(healthUrls, net.JoinHostPort(nodeIP, healthPort))
}
healthOutputTmpl := `{
"service": {
"namespace": "%s",
"name": "agnhost-local"
},
"localEndpoints": 1
}`
healthExpected := fmt.Sprintf(healthOutputTmpl, data.testNamespace)

port := "8080"
clusterUrl := net.JoinHostPort(clusterIngressIP[0], port)
localUrl := net.JoinHostPort(localIngressIP[0], port)
Expand All @@ -204,7 +227,7 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
createAgnhostPod(t, data, agnhosts[idx], node, false)
}
t.Run("Non-HostNetwork Endpoints", func(t *testing.T) {
loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, agnhosts)
loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, agnhosts)
})

// Delete agnhost Pods which are not on host network and create new agnhost Pods which are on host network.
Expand All @@ -214,19 +237,19 @@ func testProxyLoadBalancerService(t *testing.T, isIPv6 bool) {
createAgnhostPod(t, data, hostAgnhosts[idx], node, true)
}
t.Run("HostNetwork Endpoints", func(t *testing.T) {
loadBalancerTestCases(t, data, clusterUrl, localUrl, nodes, busyboxes, busyboxIPs, nodes)
loadBalancerTestCases(t, data, clusterUrl, localUrl, healthExpected, nodes, healthUrls, busyboxes, busyboxIPs, nodes)
})
}

func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl string, nodes, pods, podIPs, hostnames []string) {
func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, healthExpected string, nodes, healthUrls, pods, podIPs, hostnames []string) {
t.Run("ExternalTrafficPolicy:Cluster/Client:Node", func(t *testing.T) {
testLoadBalancerClusterFromNode(t, data, nodes, clusterUrl)
})
t.Run("ExternalTrafficPolicy:Cluster/Client:Pod", func(t *testing.T) {
testLoadBalancerClusterFromPod(t, data, pods, clusterUrl)
})
t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) {
testLoadBalancerLocalFromNode(t, data, nodes, localUrl, hostnames)
testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl, hostnames)
})
t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) {
testLoadBalancerLocalFromPod(t, data, pods, localUrl, podIPs, hostnames)
Expand All @@ -246,12 +269,18 @@ func testLoadBalancerClusterFromPod(t *testing.T, data *TestData, pods []string,
}
}

func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes []string, url string, expectedHostnames []string) {
func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string, expectedHostnames []string) {
skipIfKubeProxyEnabled(t, data)
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, url, data)
require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node")
require.Equal(t, hostname, expectedHostnames[idx])

for _, healthUrl := range healthUrls {
healthOutput, _, err := probeHealthFromNode(node, healthUrl, data)
require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should have a response for healthcheck")
require.Equal(t, healthOutput, healthExpected)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions third_party/proxy/healthcheck/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Package proxy/healthcheck is copied from [k8s.io/kubernetes@/v1.24.4](https://github.com/kubernetes/kubernetes/tree/v1.24.4) to avoid importing the whole kubernetes repo.
63 changes: 63 additions & 0 deletions third_party/proxy/healthcheck/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package healthcheck

import (
"net"
"net/http"
)

// listener allows for testing of ServiceHealthServer and ProxierHealthServer.
type listener interface {
// Listen is very much like net.Listen, except the first arg (network) is
// fixed to be "tcp".
Listen(addr string) (net.Listener, error)
}

// httpServerFactory allows for testing of ServiceHealthServer and ProxierHealthServer.
type httpServerFactory interface {
// New creates an instance of a type satisfying HTTPServer. This is
// designed to include http.Server.
New(addr string, handler http.Handler) httpServer
}

// httpServer allows for testing of ServiceHealthServer and ProxierHealthServer.
// It is designed so that http.Server satisfies this interface,
type httpServer interface {
Serve(listener net.Listener) error
}

// Implement listener in terms of net.Listen.
type stdNetListener struct{}

func (stdNetListener) Listen(addr string) (net.Listener, error) {
return net.Listen("tcp", addr)
}

var _ listener = stdNetListener{}

// Implement httpServerFactory in terms of http.Server.
type stdHTTPServerFactory struct{}

func (stdHTTPServerFactory) New(addr string, handler http.Handler) httpServer {
return &http.Server{
Addr: addr,
Handler: handler,
}
}

var _ httpServerFactory = stdHTTPServerFactory{}
Loading