Skip to content

Commit

Permalink
Verify the status of required routes and IP configuration of gateway …
Browse files Browse the repository at this point in the history
…periodically

Add checks to the routeClient. The required routes will be added back if they were
deleted unexpectedly. Add IP configuration check of the gateway to the agent.
An integration test is added to verify that the route will be added back correctly.

Fixes antrea-io#627
  • Loading branch information
hty690 committed Apr 22, 2021
1 parent b88f39e commit 91165d5
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 94 deletions.
9 changes: 5 additions & 4 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func run(o *Options) error {
// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -141,6 +145,7 @@ func run(o *Options) error {
serviceCIDRNetv6,
networkConfig,
networkReadyCh,
stopCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -256,10 +261,6 @@ func run(o *Options) error {
if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
}

func NewInitializer(
Expand All @@ -86,6 +87,7 @@ func NewInitializer(
serviceCIDRv6 *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
stopCh <-chan struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -100,6 +102,7 @@ func NewInitializer(
serviceCIDRv6: serviceCIDRv6,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
stopCh: stopCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -845,6 +848,13 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
return err
}
// Periodically check whether IP configuration of the gateway is correct.
// Terminated when stopCh is closed.
go wait.Until(func() {
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
klog.Errorf("Failed to check IP configuration of the gateway: %v", err)
}
}, 60*time.Second, i.stopCh)

for _, gwIP := range gwIPs {
if gwIP.IP.To4() != nil {
Expand Down
47 changes: 45 additions & 2 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,49 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
if err := c.syncRoutes(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
}

func (c *Client) syncRoutes() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
return err
}
routeMap := make(map[string]*netlink.Route)
for i := range routeList {
r := &routeList[i]
if r.Dst == nil {
continue
}
routeMap[r.Dst.String()] = r
}
c.nodeRoutes.Range(func(_, v interface{}) bool {
for _, route := range v.([]*netlink.Route) {
r, ok := routeMap[route.Dst.String()]
if ok && routeEqual(route, r) {
continue
}
if err := netlink.RouteReplace(route); err != nil {
klog.Errorf("Failed to add route to the gateway: %v", err)
return false
}
}
return true
})
return nil
}

func routeEqual(x, y *netlink.Route) bool {
if x == nil || y == nil {
return false
}
return x.LinkIndex == y.LinkIndex &&
x.Dst.IP.Equal(y.Dst.IP) &&
bytes.Equal(x.Dst.Mask, y.Dst.Mask) &&
x.Gw.Equal(y.Gw)
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down Expand Up @@ -578,13 +620,14 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error {

routes, exists := c.nodeRoutes.Load(podCIDRStr)
if exists {
c.nodeRoutes.Delete(podCIDRStr)
for _, r := range routes.([]*netlink.Route) {
klog.V(4).Infof("Deleting route %v", r)
if err := netlink.RouteDel(r); err != nil && err != unix.ESRCH {
c.nodeRoutes.Store(podCIDRStr, routes)
return err
}
}
c.nodeRoutes.Delete(podCIDRStr)
}
if podCIDR.IP.To4() == nil {
neigh, exists := c.nodeNeighbors.Load(podCIDRStr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/util/net_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link {
return link
}

// GetPeerLinkBridge returns peer device and its attached bridge (if applicable)
// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable)
// for device dev in network space indicated by nsPath
func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) {
var peerIdx int
Expand Down
90 changes: 5 additions & 85 deletions test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Fatalf(" failed to get encap mode, err %v", err)
}

type Route struct {
peerPodCIDR *net.IPNet
peerPodGW net.IP
}

nodeName := nodeName(0)
antreaPodName := func() string {
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err)
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName)
return antreaPodName
}

antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace)
if err != nil {
t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err)
}

getGatewayRoutes := func() (routes []Route, err error) {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "list", "dev", antreaGWName}
} else {
cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName}
}
podName := antreaPodName()
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr)
}
re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`)
for _, line := range strings.Split(stdout, "\n") {
var err error
matches := re.FindStringSubmatch(line)
if len(matches) == 0 {
continue
}
route := Route{}
if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil {
return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1])
}
if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil {
return nil, fmt.Errorf("%s is not a valid IP", matches[2])
}
routes = append(routes, route)
}
return routes, nil
}

expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1
if encapMode == config.TrafficEncapModeNoEncap {
Expand All @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Logf("Retrieving gateway routes on Node '%s'", nodeName)
var routes []Route
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) {
routes, err = getGatewayRoutes()
routes, err = getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
routeToAdd.peerPodGW = net.ParseIP("fe80::1")
}

// We run the ip command from the antrea-agent container for delete / add since they need to
// be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode,
// we may need to use "sudo" for some providers (e.g. vagrant).
deleteGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()}
} else {
cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

addGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
} else {
cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

if routeToDelete != nil {
t.Logf("Deleting one actual gateway route and adding a dummy one")
if err := deleteGatewayRoute(routeToDelete); err != nil {
if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil {
t.Fatalf("Error when deleting route: %v", err)
}
}
if err := addGatewayRoute(routeToAdd); err != nil {
if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil {
t.Fatalf("Error when adding dummy route route: %v", err)
}
defer func() {
// Cleanup the dummy route regardless of whether the test was a success or a
// failure; ignore error (there will be an error if the test is a success since the
// dummy route will no longer exist).
_ = deleteGatewayRoute(routeToAdd)
_ = deleteGatewayRoute(routeToAdd, data, isIPv6)
}()

t.Logf("Restarting antrea-agent on Node '%s'", nodeName)
Expand All @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
// We expect the agent to delete the extra route we added and add back the route we deleted
t.Logf("Waiting for gateway routes to converge")
if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) {
newRoutes, err := getGatewayRoutes()
newRoutes, err := getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down
112 changes: 112 additions & 0 deletions test/e2e/route_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
"fmt"
"net"
"regexp"
"strings"
)

type Route struct {
peerPodCIDR *net.IPNet
peerPodGW net.IP
}

func getGatewayRoutes(data *TestData, isIPv6 bool) (routes []Route, err error) {
antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace)
if err != nil {
return nil, err
}
var cmd []string
nodeName := nodeName(0)
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
return nil, err
}

if !isIPv6 {
cmd = []string{"ip", "route", "list", "dev", antreaGWName}
} else {
cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName}
}
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", antreaPodName, err, stdout, stderr)
}

re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`)
for _, line := range strings.Split(stdout, "\n") {
var err error
matches := re.FindStringSubmatch(line)
if len(matches) == 0 {
continue
}
route := Route{}
if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil {
return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1])
}
if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil {
return nil, fmt.Errorf("%s is not a valid IP", matches[2])
}
routes = append(routes, route)
}
return routes, nil
}

func deleteGatewayRoute(route *Route, data *TestData, isIPv6 bool) error {
var cmd []string
nodeName := nodeName(0)
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
return err
}

if !isIPv6 {
cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()}
} else {
cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()}
}
_, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

func addGatewayRoute(route *Route, data *TestData, isIPv6 bool) error {
antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace)
if err != nil {
return err
}
var cmd []string
nodeName := nodeName(0)
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
return err
}

if !isIPv6 {
cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
} else {
cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
}
_, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}
Loading

0 comments on commit 91165d5

Please sign in to comment.