diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 6def7e95eb6..370796394a9 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -127,6 +127,7 @@ func run(o *Options) error { // networkReadyCh is used to notify that the Node's network is ready. // Functions that rely on the Node's network should wait for the channel to close. networkReadyCh := make(chan struct{}) + stopCh := signals.RegisterSignalHandlers() // Initialize agent and node network. agentInitializer := agent.NewInitializer( k8sClient, @@ -141,6 +142,7 @@ func run(o *Options) error { serviceCIDRNetv6, networkConfig, networkReadyCh, + stopCh, features.DefaultFeatureGate.Enabled(features.AntreaProxy)) err = agentInitializer.Initialize() if err != nil { @@ -259,7 +261,6 @@ func run(o *Options) error { // set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will // cause the stopCh channel to be closed; if another signal is received before the program // exits, we will force exit. - stopCh := signals.RegisterSignalHandlers() // Start the NPL agent. if features.DefaultFeatureGate.Enabled(features.NodePortLocal) { diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index d00042a2638..c19816bfc08 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -71,6 +71,7 @@ type Initializer struct { // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. networkReadyCh chan<- struct{} + stopCh <-chan struct{} } func NewInitializer( @@ -86,6 +87,7 @@ func NewInitializer( serviceCIDRv6 *net.IPNet, networkConfig *config.NetworkConfig, networkReadyCh chan<- struct{}, + stopCh <-chan struct{}, enableProxy bool) *Initializer { return &Initializer{ ovsBridgeClient: ovsBridgeClient, @@ -100,6 +102,7 @@ func NewInitializer( serviceCIDRv6: serviceCIDRv6, networkConfig: networkConfig, networkReadyCh: networkReadyCh, + stopCh: stopCh, enableProxy: enableProxy, } } @@ -222,6 +225,31 @@ func (i *Initializer) Initialize() error { return err } + // Periodically check whether IP configuration of the gateway is correct. + // Terminated when stopCh is closed. + go wait.Until(func() { + _, _, err := util.SetLinkUp(i.hostGateway) + if err != nil { + klog.Errorf("Failed to set up gateway: %v", err) + return + } + podCidrs := []*net.IPNet{i.nodeConfig.PodIPv4CIDR, i.nodeConfig.PodIPv6CIDR} + gwIPs := []*net.IPNet{} + for _, podCidr := range podCidrs { + if podCidr == nil { + continue + } + subnetID := podCidr.IP.Mask(podCidr.Mask) + gwIP := &net.IPNet{IP: ip.NextIP(subnetID), Mask: podCidr.Mask} + gwIPs = append(gwIPs, gwIP) + } + + if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { + klog.Errorf("Failed to check IP configuration of the gateway: %v", err) + return + } + }, 60*time.Second, i.stopCh) + wg.Add(1) // routeClient.Initialize() should be after i.setupOVSBridge() which // creates the host gateway interface. diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 67983dd5932..038e88da0a6 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -152,7 +152,79 @@ func (c *Client) syncIPInfra() { klog.Errorf("Failed to sync iptables: %v", err) return } - klog.V(3).Infof("Successfully synced node iptables") + if err := c.syncRoutes(); err != nil { + klog.Errorf("Failed to sync routes: %v", err) + } + klog.V(3).Infof("Successfully synced node iptables and routes") +} + +func (c *Client) syncRoutes() error { + routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL) + if err != nil { + return err + } + routeMap := make(map[string]*netlink.Route) + for i := range routeList { + r := &routeList[i] + if r == nil || r.Dst == nil { + continue + } + routeMap[r.Dst.String()] = r + } + routes := []*netlink.Route{} + c.nodeRoutes.Range(func(_, v interface{}) bool { + for _, route := range v.([]*netlink.Route) { + routes = append(routes, route) + } + return true + }) + for _, route := range routes { + r, ok := routeMap[route.Dst.String()] + if !ok { + if err := netlink.RouteAdd(route); err != nil { + return err + } + continue + } + if !routeEqual(route, r) { + if err := netlink.RouteAdd(route); err != nil { + return err + } + } + } + return nil +} + +// func (c *Client) syncGwIp() error { +// _, _, err := util.SetLinkUp(c.nodeConfig.GatewayConfig.Name) +// if err != nil { +// return err +// } +// podCidrs := []*net.IPNet{c.nodeConfig.PodIPv4CIDR, c.nodeConfig.PodIPv6CIDR} +// gwIPs := []*net.IPNet{} +// for _, podCidr := range podCidrs { +// if podCidr == nil { +// continue +// } +// subnetID := podCidr.IP.Mask(podCidr.Mask) +// gwIP := &net.IPNet{IP: ip.NextIP(subnetID), Mask: podCidr.Mask} +// gwIPs = append(gwIPs, gwIP) +// } + +// if err := util.ConfigureLinkAddresses(c.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { +// return err +// } +// return nil +// } + +func routeEqual(x, y *netlink.Route) bool { + if x == nil || y == nil { + return false + } + return x.LinkIndex == y.LinkIndex && + x.Dst.IP.Equal(y.Dst.IP) && + bytes.Equal(x.Dst.Mask, y.Dst.Mask) && + x.Gw.Equal(y.Gw) } // syncIPSet ensures that the required ipset exists and it has the initial members. diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index f26e711f351..28a8e97a80f 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link { return link } -// GetPeerLinkBridge returns peer device and its attached bridge (if applicable) +// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable) // for device dev in network space indicated by nsPath func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) { var peerIdx int diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index caceb7f2bd5..a27934176a7 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo t.Fatalf(" failed to get encap mode, err %v", err) } - type Route struct { - peerPodCIDR *net.IPNet - peerPodGW net.IP - } - nodeName := nodeName(0) - antreaPodName := func() string { - antreaPodName, err := data.getAntreaPodOnNode(nodeName) - if err != nil { - t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err) - } - t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName) - return antreaPodName - } - - antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) - if err != nil { - t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err) - } - - getGatewayRoutes := func() (routes []Route, err error) { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "list", "dev", antreaGWName} - } else { - cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName} - } - podName := antreaPodName() - stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd) - if err != nil { - return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr) - } - re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`) - for _, line := range strings.Split(stdout, "\n") { - var err error - matches := re.FindStringSubmatch(line) - if len(matches) == 0 { - continue - } - route := Route{} - if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { - return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) - } - if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { - return nil, fmt.Errorf("%s is not a valid IP", matches[2]) - } - routes = append(routes, route) - } - return routes, nil - } expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1 if encapMode == config.TrafficEncapModeNoEncap { @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo t.Logf("Retrieving gateway routes on Node '%s'", nodeName) var routes []Route if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) { - routes, err = getGatewayRoutes() + routes, err = getGatewayRoutes(data, isIPv6) if err != nil { return false, err } @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo routeToAdd.peerPodGW = net.ParseIP("fe80::1") } - // We run the ip command from the antrea-agent container for delete / add since they need to - // be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode, - // we may need to use "sudo" for some providers (e.g. vagrant). - deleteGatewayRoute := func(route *Route) error { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()} - } else { - cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()} - } - _, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd) - if err != nil { - return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) - } - return nil - } - - addGatewayRoute := func(route *Route) error { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} - } else { - cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} - } - _, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd) - if err != nil { - return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) - } - return nil - } - if routeToDelete != nil { t.Logf("Deleting one actual gateway route and adding a dummy one") - if err := deleteGatewayRoute(routeToDelete); err != nil { + if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil { t.Fatalf("Error when deleting route: %v", err) } } - if err := addGatewayRoute(routeToAdd); err != nil { + if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil { t.Fatalf("Error when adding dummy route route: %v", err) } defer func() { // Cleanup the dummy route regardless of whether the test was a success or a // failure; ignore error (there will be an error if the test is a success since the // dummy route will no longer exist). - _ = deleteGatewayRoute(routeToAdd) + _ = deleteGatewayRoute(routeToAdd, data, isIPv6) }() t.Logf("Restarting antrea-agent on Node '%s'", nodeName) @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo // We expect the agent to delete the extra route we added and add back the route we deleted t.Logf("Waiting for gateway routes to converge") if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { - newRoutes, err := getGatewayRoutes() + newRoutes, err := getGatewayRoutes(data, isIPv6) if err != nil { return false, err } @@ -558,6 +478,105 @@ func getRoundNumber(data *TestData, podName string) (uint64, error) { return 0, fmt.Errorf("did not find roundNum in OVSDB result") } +// TestSyncRoutes checks that when some routes are removed, the Antrea Agent will synchronize the +// state of routes and add missing routes back. +func TestSyncRoutes(t *testing.T) { + skipIfNumNodesLessThan(t, 2) + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + if len(clusterInfo.podV4NetworkCIDR) != 0 { + t.Logf("Running IPv4 test") + testSyncRoutes(t, data, false) + } + if len(clusterInfo.podV6NetworkCIDR) != 0 { + t.Logf("Running IPv6 test") + testSyncRoutes(t, data, true) + } +} + +func testSyncRoutes(t *testing.T, data *TestData, isIPv6 bool) { + encapMode, err := data.GetEncapMode() + if err != nil { + t.Fatalf(" failed to get encap mode, err %v", err) + } + + if err != nil { + t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err) + } + + expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1 + if encapMode == config.TrafficEncapModeNoEncap { + expectedRtNumMin, expectedRtNumMax = 0, 0 + + } else if encapMode == config.TrafficEncapModeHybrid { + expectedRtNumMin = 1 + } + var routes []Route + if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) { + routes, err = getGatewayRoutes(data, isIPv6) + if err != nil { + return false, err + } + + if len(routes) < expectedRtNumMin { + // Not enough routes, keep trying + return false, nil + } else if len(routes) > expectedRtNumMax { + return false, fmt.Errorf("found too many gateway routes, expected %d but got %d", expectedRtNumMax, len(routes)) + } + return true, nil + }); err == wait.ErrWaitTimeout { + t.Fatalf("Not enough gateway routes after %v", defaultTimeout) + } else if err != nil { + t.Fatalf("Error while waiting for gateway routes: %v", err) + } else { + t.Logf("Found all expected gateway routes") + } + + var routeToDelete *Route + if encapMode.SupportsEncap() { + routeToDelete = &routes[0] + } + if routeToDelete != nil { + t.Logf("Deleting one actual gateway route") + if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil { + t.Fatalf("Error when deleting route: %v", err) + } + } + + if err := wait.Poll(30*time.Second, 3*time.Minute, func() (bool, error) { + newRoutes, err := getGatewayRoutes(data, isIPv6) + if err != nil { + return false, err + } + if len(newRoutes) != len(routes) { + return false, nil + } + if routeToDelete != nil { + for _, route := range newRoutes { + if route.peerPodGW.Equal(routeToDelete.peerPodGW) { + // The deleted route was added back, success! + return true, nil + } + } + } else { + return true, nil + } + // We haven't found the deleted route, keep trying + return false, nil + }); err == wait.ErrWaitTimeout { + t.Errorf("Gateway routes did not converge after %v", defaultTimeout) + } else if err != nil { + t.Fatalf("Error while waiting for gateway routes to converge: %v", err) + } else { + t.Logf("Gateway routes successfully converged") + } +} + // TestDeletePreviousRoundFlowsOnStartup checks that when the Antrea agent is restarted, flows from // the previous "round" which are no longer needed (e.g. in case of changes to the cluster / to // Network Policies) are removed correctly. diff --git a/test/e2e/route_util.go b/test/e2e/route_util.go new file mode 100644 index 00000000000..488443cadb5 --- /dev/null +++ b/test/e2e/route_util.go @@ -0,0 +1,112 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "net" + "regexp" + "strings" +) + +type Route struct { + peerPodCIDR *net.IPNet + peerPodGW net.IP +} + +func getGatewayRoutes(data *TestData, isIPv6 bool) (routes []Route, err error) { + antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) + if err != nil { + return nil, err + } + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return nil, err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "list", "dev", antreaGWName} + } else { + cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName} + } + stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", antreaPodName, err, stdout, stderr) + } + + re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`) + for _, line := range strings.Split(stdout, "\n") { + var err error + matches := re.FindStringSubmatch(line) + if len(matches) == 0 { + continue + } + route := Route{} + if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { + return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) + } + if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { + return nil, fmt.Errorf("%s is not a valid IP", matches[2]) + } + routes = append(routes, route) + } + return routes, nil +} + +func deleteGatewayRoute(route *Route, data *TestData, isIPv6 bool) error { + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()} + } else { + cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()} + } + _, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) + } + return nil +} + +func addGatewayRoute(route *Route, data *TestData, isIPv6 bool) error { + antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) + if err != nil { + return err + } + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} + } else { + cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} + } + _, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) + } + return nil +} diff --git a/test/e2e/util.go b/test/e2e/util.go index 3d1102c7151..20891eb6b53 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1,3 +1,17 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package e2e import (