diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 67983dd5932..00c1c11ac9b 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -152,7 +152,74 @@ func (c *Client) syncIPInfra() { klog.Errorf("Failed to sync iptables: %v", err) return } - klog.V(3).Infof("Successfully synced node iptables") + if err := c.syncRoutes(); err != nil { + klog.Errorf("Failed to sync routes: %v", err) + } + if err := c.syncGwIp(); err != nil { + klog.Errorf("Failed to sync Ip configuration of gateway: %v", err) + } + klog.V(3).Infof("Successfully synced node iptables and routes") +} + +func (c *Client) syncRoutes() error { + routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL) + if err != nil { + return err + } + routes := []*netlink.Route{} + c.nodeRoutes.Range(func(_, v interface{}) bool { + for _, route := range v.([]*netlink.Route) { + routes = append(routes, route) + } + return true + }) + for _, route := range routes { + exist := false + for _, r := range routeList { + if routeEqual(route, &r) { + exist = true + break + } + } + if !exist { + if err := netlink.RouteAdd(route); err != nil { + return err + } + } + } + return nil +} + +func (c *Client) syncGwIp() error { + _, _, err := util.SetLinkUp(c.nodeConfig.GatewayConfig.Name) + if err != nil { + return err + } + podCidrs := []*net.IPNet{c.nodeConfig.PodIPv4CIDR, c.nodeConfig.PodIPv6CIDR} + gwIPs := []*net.IPNet{} + for _, podCidr := range podCidrs { + if podCidr == nil { + continue + } + subnetID := podCidr.IP.Mask(podCidr.Mask) + gwIP := &net.IPNet{IP: ip.NextIP(subnetID), Mask: podCidr.Mask} + gwIPs = append(gwIPs, gwIP) + } + + if err := util.ConfigureLinkAddresses(c.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil { + return err + } + return nil +} + +func routeEqual(x, y *netlink.Route) bool { + if x == nil || y == nil { + return false + } + return x.LinkIndex == y.LinkIndex && + x.Dst.IP.Equal(y.Dst.IP) && + bytes.Equal(x.Dst.Mask, y.Dst.Mask) && + x.Gw.Equal(y.Gw) } // syncIPSet ensures that the required ipset exists and it has the initial members. diff --git a/pkg/agent/util/net_linux.go b/pkg/agent/util/net_linux.go index f26e711f351..28a8e97a80f 100644 --- a/pkg/agent/util/net_linux.go +++ b/pkg/agent/util/net_linux.go @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link { return link } -// GetPeerLinkBridge returns peer device and its attached bridge (if applicable) +// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable) // for device dev in network space indicated by nsPath func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) { var peerIdx int diff --git a/test/e2e/basic_test.go b/test/e2e/basic_test.go index caceb7f2bd5..a27934176a7 100644 --- a/test/e2e/basic_test.go +++ b/test/e2e/basic_test.go @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo t.Fatalf(" failed to get encap mode, err %v", err) } - type Route struct { - peerPodCIDR *net.IPNet - peerPodGW net.IP - } - nodeName := nodeName(0) - antreaPodName := func() string { - antreaPodName, err := data.getAntreaPodOnNode(nodeName) - if err != nil { - t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err) - } - t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName) - return antreaPodName - } - - antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) - if err != nil { - t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err) - } - - getGatewayRoutes := func() (routes []Route, err error) { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "list", "dev", antreaGWName} - } else { - cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName} - } - podName := antreaPodName() - stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd) - if err != nil { - return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr) - } - re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`) - for _, line := range strings.Split(stdout, "\n") { - var err error - matches := re.FindStringSubmatch(line) - if len(matches) == 0 { - continue - } - route := Route{} - if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { - return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) - } - if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { - return nil, fmt.Errorf("%s is not a valid IP", matches[2]) - } - routes = append(routes, route) - } - return routes, nil - } expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1 if encapMode == config.TrafficEncapModeNoEncap { @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo t.Logf("Retrieving gateway routes on Node '%s'", nodeName) var routes []Route if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) { - routes, err = getGatewayRoutes() + routes, err = getGatewayRoutes(data, isIPv6) if err != nil { return false, err } @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo routeToAdd.peerPodGW = net.ParseIP("fe80::1") } - // We run the ip command from the antrea-agent container for delete / add since they need to - // be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode, - // we may need to use "sudo" for some providers (e.g. vagrant). - deleteGatewayRoute := func(route *Route) error { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()} - } else { - cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()} - } - _, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd) - if err != nil { - return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) - } - return nil - } - - addGatewayRoute := func(route *Route) error { - var cmd []string - if !isIPv6 { - cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} - } else { - cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} - } - _, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd) - if err != nil { - return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) - } - return nil - } - if routeToDelete != nil { t.Logf("Deleting one actual gateway route and adding a dummy one") - if err := deleteGatewayRoute(routeToDelete); err != nil { + if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil { t.Fatalf("Error when deleting route: %v", err) } } - if err := addGatewayRoute(routeToAdd); err != nil { + if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil { t.Fatalf("Error when adding dummy route route: %v", err) } defer func() { // Cleanup the dummy route regardless of whether the test was a success or a // failure; ignore error (there will be an error if the test is a success since the // dummy route will no longer exist). - _ = deleteGatewayRoute(routeToAdd) + _ = deleteGatewayRoute(routeToAdd, data, isIPv6) }() t.Logf("Restarting antrea-agent on Node '%s'", nodeName) @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo // We expect the agent to delete the extra route we added and add back the route we deleted t.Logf("Waiting for gateway routes to converge") if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) { - newRoutes, err := getGatewayRoutes() + newRoutes, err := getGatewayRoutes(data, isIPv6) if err != nil { return false, err } @@ -558,6 +478,105 @@ func getRoundNumber(data *TestData, podName string) (uint64, error) { return 0, fmt.Errorf("did not find roundNum in OVSDB result") } +// TestSyncRoutes checks that when some routes are removed, the Antrea Agent will synchronize the +// state of routes and add missing routes back. +func TestSyncRoutes(t *testing.T) { + skipIfNumNodesLessThan(t, 2) + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + if len(clusterInfo.podV4NetworkCIDR) != 0 { + t.Logf("Running IPv4 test") + testSyncRoutes(t, data, false) + } + if len(clusterInfo.podV6NetworkCIDR) != 0 { + t.Logf("Running IPv6 test") + testSyncRoutes(t, data, true) + } +} + +func testSyncRoutes(t *testing.T, data *TestData, isIPv6 bool) { + encapMode, err := data.GetEncapMode() + if err != nil { + t.Fatalf(" failed to get encap mode, err %v", err) + } + + if err != nil { + t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err) + } + + expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1 + if encapMode == config.TrafficEncapModeNoEncap { + expectedRtNumMin, expectedRtNumMax = 0, 0 + + } else if encapMode == config.TrafficEncapModeHybrid { + expectedRtNumMin = 1 + } + var routes []Route + if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) { + routes, err = getGatewayRoutes(data, isIPv6) + if err != nil { + return false, err + } + + if len(routes) < expectedRtNumMin { + // Not enough routes, keep trying + return false, nil + } else if len(routes) > expectedRtNumMax { + return false, fmt.Errorf("found too many gateway routes, expected %d but got %d", expectedRtNumMax, len(routes)) + } + return true, nil + }); err == wait.ErrWaitTimeout { + t.Fatalf("Not enough gateway routes after %v", defaultTimeout) + } else if err != nil { + t.Fatalf("Error while waiting for gateway routes: %v", err) + } else { + t.Logf("Found all expected gateway routes") + } + + var routeToDelete *Route + if encapMode.SupportsEncap() { + routeToDelete = &routes[0] + } + if routeToDelete != nil { + t.Logf("Deleting one actual gateway route") + if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil { + t.Fatalf("Error when deleting route: %v", err) + } + } + + if err := wait.Poll(30*time.Second, 3*time.Minute, func() (bool, error) { + newRoutes, err := getGatewayRoutes(data, isIPv6) + if err != nil { + return false, err + } + if len(newRoutes) != len(routes) { + return false, nil + } + if routeToDelete != nil { + for _, route := range newRoutes { + if route.peerPodGW.Equal(routeToDelete.peerPodGW) { + // The deleted route was added back, success! + return true, nil + } + } + } else { + return true, nil + } + // We haven't found the deleted route, keep trying + return false, nil + }); err == wait.ErrWaitTimeout { + t.Errorf("Gateway routes did not converge after %v", defaultTimeout) + } else if err != nil { + t.Fatalf("Error while waiting for gateway routes to converge: %v", err) + } else { + t.Logf("Gateway routes successfully converged") + } +} + // TestDeletePreviousRoundFlowsOnStartup checks that when the Antrea agent is restarted, flows from // the previous "round" which are no longer needed (e.g. in case of changes to the cluster / to // Network Policies) are removed correctly. diff --git a/test/e2e/route_util.go b/test/e2e/route_util.go new file mode 100644 index 00000000000..7654d54b913 --- /dev/null +++ b/test/e2e/route_util.go @@ -0,0 +1,98 @@ +package e2e + +import ( + "fmt" + "net" + "regexp" + "strings" +) + +type Route struct { + peerPodCIDR *net.IPNet + peerPodGW net.IP +} + +func getGatewayRoutes(data *TestData, isIPv6 bool) (routes []Route, err error) { + antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) + if err != nil { + return nil, err + } + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return nil, err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "list", "dev", antreaGWName} + } else { + cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName} + } + stdout, stderr, err := data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", antreaPodName, err, stdout, stderr) + } + + re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`) + for _, line := range strings.Split(stdout, "\n") { + var err error + matches := re.FindStringSubmatch(line) + if len(matches) == 0 { + continue + } + route := Route{} + if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil { + return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1]) + } + if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil { + return nil, fmt.Errorf("%s is not a valid IP", matches[2]) + } + routes = append(routes, route) + } + return routes, nil +} + +func deleteGatewayRoute(route *Route, data *TestData, isIPv6 bool) error { + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()} + } else { + cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()} + } + _, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) + } + return nil +} + +func addGatewayRoute(route *Route, data *TestData, isIPv6 bool) error { + antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace) + if err != nil { + return err + } + var cmd []string + nodeName := nodeName(0) + antreaPodName, err := data.getAntreaPodOnNode(nodeName) + if err != nil { + return err + } + + if !isIPv6 { + cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} + } else { + cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"} + } + _, _, err = data.runCommandFromPod(antreaNamespace, antreaPodName, agentContainerName, cmd) + if err != nil { + return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err) + } + return nil +}