Skip to content

Commit

Permalink
Verify the status of required routes and IP configuration of gateway …
Browse files Browse the repository at this point in the history
…periodically

Add checks to the routeClient. The required routes will be added back if they were
deleted unexpectedly. The client also checks if the IP configuration of the gateway
is correct. An e2e test is added to verify that the route will be added back correctly.

Fixes #627
  • Loading branch information
hty690 committed Apr 14, 2021
1 parent b88f39e commit d1f2b60
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 87 deletions.
69 changes: 68 additions & 1 deletion pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,74 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
if err := c.syncRoutes(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
if err := c.syncGwIp(); err != nil {
klog.Errorf("Failed to sync Ip configuration of gateway: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
}

func (c *Client) syncRoutes() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
return err
}
routes := []*netlink.Route{}
c.nodeRoutes.Range(func(_, v interface{}) bool {
for _, route := range v.([]*netlink.Route) {
routes = append(routes, route)
}
return true
})
for _, route := range routes {
exist := false
for _, r := range routeList {
if routeEqual(route, &r) {
exist = true
break
}
}
if !exist {
if err := netlink.RouteAdd(route); err != nil {
return err
}
}
}
return nil
}

func (c *Client) syncGwIp() error {
_, _, err := util.SetLinkUp(c.nodeConfig.GatewayConfig.Name)
if err != nil {
return err
}
podCidrs := []*net.IPNet{c.nodeConfig.PodIPv4CIDR, c.nodeConfig.PodIPv6CIDR}
gwIPs := []*net.IPNet{}
for _, podCidr := range podCidrs {
if podCidr == nil {
continue
}
subnetID := podCidr.IP.Mask(podCidr.Mask)
gwIP := &net.IPNet{IP: ip.NextIP(subnetID), Mask: podCidr.Mask}
gwIPs = append(gwIPs, gwIP)
}

if err := util.ConfigureLinkAddresses(c.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
return err
}
return nil
}

func routeEqual(x, y *netlink.Route) bool {
if x == nil || y == nil {
return false
}
return x.LinkIndex == y.LinkIndex &&
x.Dst.IP.Equal(y.Dst.IP) &&
bytes.Equal(x.Dst.Mask, y.Dst.Mask) &&
x.Gw.Equal(y.Gw)
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/util/net_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link {
return link
}

// GetPeerLinkBridge returns peer device and its attached bridge (if applicable)
// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable)
// for device dev in network space indicated by nsPath
func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) {
var peerIdx int
Expand Down
189 changes: 104 additions & 85 deletions test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Fatalf(" failed to get encap mode, err %v", err)
}

type Route struct {
peerPodCIDR *net.IPNet
peerPodGW net.IP
}

nodeName := nodeName(0)
antreaPodName := func() string {
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err)
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName)
return antreaPodName
}

antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace)
if err != nil {
t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err)
}

getGatewayRoutes := func() (routes []Route, err error) {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "list", "dev", antreaGWName}
} else {
cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName}
}
podName := antreaPodName()
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr)
}
re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`)
for _, line := range strings.Split(stdout, "\n") {
var err error
matches := re.FindStringSubmatch(line)
if len(matches) == 0 {
continue
}
route := Route{}
if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil {
return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1])
}
if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil {
return nil, fmt.Errorf("%s is not a valid IP", matches[2])
}
routes = append(routes, route)
}
return routes, nil
}

expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1
if encapMode == config.TrafficEncapModeNoEncap {
Expand All @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Logf("Retrieving gateway routes on Node '%s'", nodeName)
var routes []Route
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) {
routes, err = getGatewayRoutes()
routes, err = getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
routeToAdd.peerPodGW = net.ParseIP("fe80::1")
}

// We run the ip command from the antrea-agent container for delete / add since they need to
// be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode,
// we may need to use "sudo" for some providers (e.g. vagrant).
deleteGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()}
} else {
cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

addGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
} else {
cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

if routeToDelete != nil {
t.Logf("Deleting one actual gateway route and adding a dummy one")
if err := deleteGatewayRoute(routeToDelete); err != nil {
if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil {
t.Fatalf("Error when deleting route: %v", err)
}
}
if err := addGatewayRoute(routeToAdd); err != nil {
if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil {
t.Fatalf("Error when adding dummy route route: %v", err)
}
defer func() {
// Cleanup the dummy route regardless of whether the test was a success or a
// failure; ignore error (there will be an error if the test is a success since the
// dummy route will no longer exist).
_ = deleteGatewayRoute(routeToAdd)
_ = deleteGatewayRoute(routeToAdd, data, isIPv6)
}()

t.Logf("Restarting antrea-agent on Node '%s'", nodeName)
Expand All @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
// We expect the agent to delete the extra route we added and add back the route we deleted
t.Logf("Waiting for gateway routes to converge")
if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) {
newRoutes, err := getGatewayRoutes()
newRoutes, err := getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -558,6 +478,105 @@ func getRoundNumber(data *TestData, podName string) (uint64, error) {
return 0, fmt.Errorf("did not find roundNum in OVSDB result")
}

// TestSyncRoutes checks that when some routes are removed, the Antrea Agent will synchronize the
// state of routes and add missing routes back.
func TestSyncRoutes(t *testing.T) {
skipIfNumNodesLessThan(t, 2)
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)

if len(clusterInfo.podV4NetworkCIDR) != 0 {
t.Logf("Running IPv4 test")
testSyncRoutes(t, data, false)
}
if len(clusterInfo.podV6NetworkCIDR) != 0 {
t.Logf("Running IPv6 test")
testSyncRoutes(t, data, true)
}
}

func testSyncRoutes(t *testing.T, data *TestData, isIPv6 bool) {
encapMode, err := data.GetEncapMode()
if err != nil {
t.Fatalf(" failed to get encap mode, err %v", err)
}

if err != nil {
t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err)
}

expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1
if encapMode == config.TrafficEncapModeNoEncap {
expectedRtNumMin, expectedRtNumMax = 0, 0

} else if encapMode == config.TrafficEncapModeHybrid {
expectedRtNumMin = 1
}
var routes []Route
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) {
routes, err = getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}

if len(routes) < expectedRtNumMin {
// Not enough routes, keep trying
return false, nil
} else if len(routes) > expectedRtNumMax {
return false, fmt.Errorf("found too many gateway routes, expected %d but got %d", expectedRtNumMax, len(routes))
}
return true, nil
}); err == wait.ErrWaitTimeout {
t.Fatalf("Not enough gateway routes after %v", defaultTimeout)
} else if err != nil {
t.Fatalf("Error while waiting for gateway routes: %v", err)
} else {
t.Logf("Found all expected gateway routes")
}

var routeToDelete *Route
if encapMode.SupportsEncap() {
routeToDelete = &routes[0]
}
if routeToDelete != nil {
t.Logf("Deleting one actual gateway route")
if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil {
t.Fatalf("Error when deleting route: %v", err)
}
}

if err := wait.Poll(30*time.Second, 3*time.Minute, func() (bool, error) {
newRoutes, err := getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
if len(newRoutes) != len(routes) {
return false, nil
}
if routeToDelete != nil {
for _, route := range newRoutes {
if route.peerPodGW.Equal(routeToDelete.peerPodGW) {
// The deleted route was added back, success!
return true, nil
}
}
} else {
return true, nil
}
// We haven't found the deleted route, keep trying
return false, nil
}); err == wait.ErrWaitTimeout {
t.Errorf("Gateway routes did not converge after %v", defaultTimeout)
} else if err != nil {
t.Fatalf("Error while waiting for gateway routes to converge: %v", err)
} else {
t.Logf("Gateway routes successfully converged")
}
}

// TestDeletePreviousRoundFlowsOnStartup checks that when the Antrea agent is restarted, flows from
// the previous "round" which are no longer needed (e.g. in case of changes to the cluster / to
// Network Policies) are removed correctly.
Expand Down
Loading

0 comments on commit d1f2b60

Please sign in to comment.