From 808637fa2c3a5e916f90cfac838e65fea4874460 Mon Sep 17 00:00:00 2001 From: hty Date: Wed, 14 Apr 2021 10:27:47 +0800 Subject: [PATCH] Verify the status of required routes and IP configuration of gateway periodically Add checks to the routeClient. The required routes will be added back if they were deleted unexpectedly. Add IP configuration check of the gateway to the agent. An e2e test is added to verify that the route will be added back correctly. Fixes #627 --- cmd/antrea-agent/agent.go | 9 +- pkg/agent/agent.go | 12 +++ pkg/agent/route/route_linux.go | 50 ++++++++- pkg/agent/util/net_linux.go | 2 +- test/e2e/basic_test.go | 189 ++++++++++++++++++--------------- test/e2e/route_util.go | 112 +++++++++++++++++++ test/e2e/util.go | 14 +++ 7 files changed, 297 insertions(+), 91 deletions(-) create mode 100644 test/e2e/route_util.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 6def7e95eb6..5ce68284c13 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -127,6 +127,10 @@ func run(o *Options) error { // networkReadyCh is used to notify that the Node's network is ready. // Functions that rely on the Node's network should wait for the channel to close. networkReadyCh := make(chan struct{}) + // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will + // cause the stopCh channel to be closed; if another signal is received before the program + // exits, we will force exit. + stopCh := signals.RegisterSignalHandlers() // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, @@ -141,6 +145,7 @@ func run(o *Options) error { serviceCIDRNetv6, networkConfig, networkReadyCh, + stopCh, features.DefaultFeatureGate.Enabled(features.AntreaProxy)) err = agentInitializer.Initialize() if err != nil { @@ -256,10 +261,6 @@ func run(o *Options) error { if err := antreaClientProvider.RunOnce(); err != nil { return err } - // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will - // cause the stopCh channel to be closed; if another signal is received before the program - // exits, we will force exit. - stopCh := signals.RegisterSignalHandlers() // Start the NPL agent. if features.DefaultFeatureGate.Enabled(features.NodePortLocal) { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d00042a2638..e4f70b77055 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -71,6 +71,7 @@ type Initializer struct { // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. networkReadyCh chan<- struct{} + stopCh <-chan struct{} } func NewInitializer( @@ -86,6 +87,7 @@ func NewInitializer( serviceCIDRv6 *net.IPNet, networkConfig *config.NetworkConfig, networkReadyCh chan<- struct{}, + stopCh <-chan struct{}, enableProxy bool) *Initializer { return &Initializer{ ovsBridgeClient: ovsBridgeClient, @@ -100,6 +102,7 @@ func NewInitializer( serviceCIDRv6: serviceCIDRv6, networkConfig: networkConfig, networkReadyCh: networkReadyCh, + stopCh: stopCh, enableProxy: enableProxy, } } @@ -845,6 +848,15 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { return err } + // Periodically check whether IP configuration of the gateway is correct. + // Terminated when stopCh is closed. + if !i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { + go wait.Until(func() { + if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { + klog.Errorf("Failed to check IP configuration of the gateway: %v", err) + } + }, 60*time.Second, i.stopCh) + } for _, gwIP := range gwIPs { if gwIP.IP.To4() != nil { diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 67983dd5932..1d7928a1b33 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -152,7 +152,55 @@ func (c *Client) syncIPInfra() { klog.Errorf("Failed to sync iptables: %v", err) return } - klog.V(3).Infof("Successfully synced node iptables") + if err := c.syncRoutes(); err != nil { + klog.Errorf("Failed to sync routes: %v", err) + } + klog.V(3).Infof("Successfully synced node iptables and routes") +} + +func (c *Client) syncRoutes() error { + routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL) + if err != nil { + return err + } + routeMap := make(map[string]*netlink.Route) + for i := range routeList { + r := &routeList[i] + if r == nil || r.Dst == nil { + continue + } + routeMap[r.Dst.String()] = r + } + c.nodeRoutes.Range(func(_, v interface{}) bool { + for _, route := range v.([]*netlink.Route) { + r, ok := routeMap[route.Dst.String()] + if !ok { + if err := netlink.RouteAdd(route); err != nil { + klog.Errorf("Failed to add route to the gateway: %v", err) + return false + } + continue + } + if !routeEqual(route, r) { + if err := netlink.RouteAdd(route); err != nil { + klog.Errorf("Failed to add route to the gateway: %v", err) + return false + } + } + } + return true + }) + return nil +} + +func routeEqual(x, y *netlink.Route) bool { + if x == nil || y == nil { + return false + } + return x.LinkIndex == y.LinkIndex && + x.Dst.IP.Equal(y.Dst.IP) && + bytes.Equal(x.Dst.Mask, y.Dst.Mask) && + x.Gw.Equal(y.Gw) } // syncIPSet ensures that the required ipset exists and it has the initial members. diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index f26e711f351..28a8e97a80f 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link { return link } -// GetPeerLinkBridge returns peer device and its attached bridge (if applicable) +// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable) // for device dev in network space indicated by nsPath func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) { var peerIdx int diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index caceb7f2bd5..a27934176a7 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo t.Fatalf(" failed to get encap mode, err %v", err) } - type Route struct { - peerPodCIDR *net.IPNet - peerPodGW net.IP - } - nodeName := nodeName(0) - antreaPodName := func() string { - antreaPodName, err := data.getAntreaPodOnNode(nodeName) - if err != nil { - t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err) - } - t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName) - return antreaPodName - } - - antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) - if err != nil { - t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err) - } - - getGatewayRoutes := func() (routes []Route, err error) { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "list", "dev", antreaGWName} - } else { - cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName} - } - podName := antreaPodName() - stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd) - if err != nil { - return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr) - } - re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`) - for _, line := range strings.Split(stdout, "\n") { - var err error - matches := re.FindStringSubmatch(line) - if len(matches) == 0 { - continue - } - route := Route{} - if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { - return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) - } - if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { - return nil, fmt.Errorf("%s is not a valid IP", matches[2]) - } - routes = append(routes, route) - } - return routes, nil - } expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1 if encapMode == config.TrafficEncapModeNoEncap { @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo t.Logf("Retrieving gateway routes on Node '%s'", nodeName) var routes []Route if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) { - routes, err = getGatewayRoutes() + routes, err = getGatewayRoutes(data, isIPv6) if err != nil { return false, err } @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo routeToAdd.peerPodGW = net.ParseIP("fe80::1") } - // We run the ip command from the antrea-agent container for delete / add since they need to - // be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode, - // we may need to use "sudo" for some providers (e.g. vagrant). - deleteGatewayRoute := func(route *Route) error { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()} - } else { - cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()} - } - _, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd) - if err != nil { - return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) - } - return nil - } - - addGatewayRoute := func(route *Route) error { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} - } else { - cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} - } - _, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd) - if err != nil { - return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) - } - return nil - } - if routeToDelete != nil { t.Logf("Deleting one actual gateway route and adding a dummy one") - if err := deleteGatewayRoute(routeToDelete); err != nil { + if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil { t.Fatalf("Error when deleting route: %v", err) } } - if err := addGatewayRoute(routeToAdd); err != nil { + if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil { t.Fatalf("Error when adding dummy route route: %v", err) } defer func() { // Cleanup the dummy route regardless of whether the test was a success or a // failure; ignore error (there will be an error if the test is a success since the // dummy route will no longer exist). - _ = deleteGatewayRoute(routeToAdd) + _ = deleteGatewayRoute(routeToAdd, data, isIPv6) }() t.Logf("Restarting antrea-agent on Node '%s'", nodeName) @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo // We expect the agent to delete the extra route we added and add back the route we deleted t.Logf("Waiting for gateway routes to converge") if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { - newRoutes, err := getGatewayRoutes() + newRoutes, err := getGatewayRoutes(data, isIPv6) if err != nil { return false, err } @@ -558,6 +478,105 @@ func getRoundNumber(data *TestData, podName string) (uint64, error) { return 0, fmt.Errorf("did not find roundNum in OVSDB result") } +// TestSyncRoutes checks that when some routes are removed, the Antrea Agent will synchronize the +// state of routes and add missing routes back. +func TestSyncRoutes(t *testing.T) { + skipIfNumNodesLessThan(t, 2) + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + if len(clusterInfo.podV4NetworkCIDR) != 0 { + t.Logf("Running IPv4 test") + testSyncRoutes(t, data, false) + } + if len(clusterInfo.podV6NetworkCIDR) != 0 { + t.Logf("Running IPv6 test") + testSyncRoutes(t, data, true) + } +} + +func testSyncRoutes(t *testing.T, data *TestData, isIPv6 bool) { + encapMode, err := data.GetEncapMode() + if err != nil { + t.Fatalf(" failed to get encap mode, err %v", err) + } + + if err != nil { + t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err) + } + + expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1 + if encapMode == config.TrafficEncapModeNoEncap { + expectedRtNumMin, expectedRtNumMax = 0, 0 + + } else if encapMode == config.TrafficEncapModeHybrid { + expectedRtNumMin = 1 + } + var routes []Route + if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) { + routes, err = getGatewayRoutes(data, isIPv6) + if err != nil { + return false, err + } + + if len(routes) < expectedRtNumMin { + // Not enough routes, keep trying + return false, nil + } else if len(routes) > expectedRtNumMax { + return false, fmt.Errorf("found too many gateway routes, expected %d but got %d", expectedRtNumMax, len(routes)) + } + return true, nil + }); err == wait.ErrWaitTimeout { + t.Fatalf("Not enough gateway routes after %v", defaultTimeout) + } else if err != nil { + t.Fatalf("Error while waiting for gateway routes: %v", err) + } else { + t.Logf("Found all expected gateway routes") + } + + var routeToDelete *Route + if encapMode.SupportsEncap() { + routeToDelete = &routes[0] + } + if routeToDelete != nil { + t.Logf("Deleting one actual gateway route") + if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil { + t.Fatalf("Error when deleting route: %v", err) + } + } + + if err := wait.Poll(30*time.Second, 3*time.Minute, func() (bool, error) { + newRoutes, err := getGatewayRoutes(data, isIPv6) + if err != nil { + return false, err + } + if len(newRoutes) != len(routes) { + return false, nil + } + if routeToDelete != nil { + for _, route := range newRoutes { + if route.peerPodGW.Equal(routeToDelete.peerPodGW) { + // The deleted route was added back, success! + return true, nil + } + } + } else { + return true, nil + } + // We haven't found the deleted route, keep trying + return false, nil + }); err == wait.ErrWaitTimeout { + t.Errorf("Gateway routes did not converge after %v", defaultTimeout) + } else if err != nil { + t.Fatalf("Error while waiting for gateway routes to converge: %v", err) + } else { + t.Logf("Gateway routes successfully converged") + } +} + // TestDeletePreviousRoundFlowsOnStartup checks that when the Antrea agent is restarted, flows from // the previous "round" which are no longer needed (e.g. in case of changes to the cluster / to // Network Policies) are removed correctly. diff --git a/test/e2e/route_util.go b/test/e2e/route_util.go new file mode 100644 index 00000000000..488443cadb5 --- /dev/null +++ b/test/e2e/route_util.go @@ -0,0 +1,112 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "net" + "regexp" + "strings" +) + +type Route struct { + peerPodCIDR *net.IPNet + peerPodGW net.IP +} + +func getGatewayRoutes(data *TestData, isIPv6 bool) (routes []Route, err error) { + antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) + if err != nil { + return nil, err + } + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return nil, err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "list", "dev", antreaGWName} + } else { + cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName} + } + stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", antreaPodName, err, stdout, stderr) + } + + re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`) + for _, line := range strings.Split(stdout, "\n") { + var err error + matches := re.FindStringSubmatch(line) + if len(matches) == 0 { + continue + } + route := Route{} + if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { + return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) + } + if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { + return nil, fmt.Errorf("%s is not a valid IP", matches[2]) + } + routes = append(routes, route) + } + return routes, nil +} + +func deleteGatewayRoute(route *Route, data *TestData, isIPv6 bool) error { + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()} + } else { + cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()} + } + _, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) + } + return nil +} + +func addGatewayRoute(route *Route, data *TestData, isIPv6 bool) error { + antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) + if err != nil { + return err + } + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} + } else { + cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} + } + _, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) + } + return nil +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 3d1102c7151..20891eb6b53 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1,3 +1,17 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package e2e import (