Skip to content

Commit

Permalink
Verify the status of required routes and IP configuration of gateway …
Browse files Browse the repository at this point in the history
…periodically

Add checks to the routeClient. The required routes will be added back if they were
deleted unexpectedly. Add IP configuration check of the gateway to the agent.
An integration test is added to verify that the route will be added back correctly.

Fixes antrea-io#627
  • Loading branch information
hty690 committed Apr 21, 2021
1 parent b88f39e commit 784eb16
Show file tree
Hide file tree
Showing 10 changed files with 253 additions and 102 deletions.
9 changes: 5 additions & 4 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func run(o *Options) error {
// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -141,6 +145,7 @@ func run(o *Options) error {
serviceCIDRNetv6,
networkConfig,
networkReadyCh,
stopCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -256,10 +261,6 @@ func run(o *Options) error {
if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
}

func NewInitializer(
Expand All @@ -86,6 +87,7 @@ func NewInitializer(
serviceCIDRv6 *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
stopCh <-chan struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -100,6 +102,7 @@ func NewInitializer(
serviceCIDRv6: serviceCIDRv6,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
stopCh: stopCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -845,6 +848,13 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
return err
}
// Periodically check whether IP configuration of the gateway is correct.
// Terminated when stopCh is closed.
go wait.Until(func() {
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
klog.Errorf("Failed to check IP configuration of the gateway: %v", err)
}
}, 60*time.Second, i.stopCh)

for _, gwIP := range gwIPs {
if gwIP.IP.To4() != nil {
Expand Down
46 changes: 44 additions & 2 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,49 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
if err := c.syncRoutes(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
}

func (c *Client) syncRoutes() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
if err != nil {
return err
}
routeMap := make(map[string]*netlink.Route)
for i := range routeList {
r := &routeList[i]
if r.Dst == nil {
continue
}
routeMap[r.Dst.String()] = r
}
c.nodeRoutes.Range(func(_, v interface{}) bool {
for _, route := range v.([]*netlink.Route) {
r, ok := routeMap[route.Dst.String()]
if ok && routeEqual(route, r) {
continue
}
if err := netlink.RouteAdd(route); err != nil {
klog.Errorf("Failed to add route to the gateway: %v", err)
return false
}
}
return true
})
return nil
}

func routeEqual(x, y *netlink.Route) bool {
if x == nil || y == nil {
return false
}
return x.LinkIndex == y.LinkIndex &&
x.Dst.IP.Equal(y.Dst.IP) &&
bytes.Equal(x.Dst.Mask, y.Dst.Mask) &&
x.Gw.Equal(y.Gw)
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down Expand Up @@ -578,13 +620,13 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error {

routes, exists := c.nodeRoutes.Load(podCIDRStr)
if exists {
c.nodeRoutes.Delete(podCIDRStr)
for _, r := range routes.([]*netlink.Route) {
klog.V(4).Infof("Deleting route %v", r)
if err := netlink.RouteDel(r); err != nil && err != unix.ESRCH {
return err
}
}
c.nodeRoutes.Delete(podCIDRStr)
}
if podCIDR.IP.To4() == nil {
neigh, exists := c.nodeNeighbors.Load(podCIDRStr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/util/net_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link {
return link
}

// GetPeerLinkBridge returns peer device and its attached bridge (if applicable)
// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable)
// for device dev in network space indicated by nsPath
func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) {
var peerIdx int
Expand Down
4 changes: 2 additions & 2 deletions pkg/antctl/raw/traceflow/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var protocolTCP = int32(6)

// TestGetPortFields tests if a flow can be turned into a map.
func TestGetPortFields(t *testing.T) {
tcs := []struct {
tcs := []*struct {
flow string
success bool
expected map[string]int
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestGetPortFields(t *testing.T) {

// TestParseFlow tests if a flow can be parsed correctly.
func TestParseFlow(t *testing.T) {
tcs := []struct {
tcs := []*struct {
flow string
success bool
expected *v1alpha1.Traceflow
Expand Down
90 changes: 5 additions & 85 deletions test/e2e/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,56 +314,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Fatalf(" failed to get encap mode, err %v", err)
}

type Route struct {
peerPodCIDR *net.IPNet
peerPodGW net.IP
}

nodeName := nodeName(0)
antreaPodName := func() string {
antreaPodName, err := data.getAntreaPodOnNode(nodeName)
if err != nil {
t.Fatalf("Error when retrieving the name of the Antrea Pod running on Node '%s': %v", nodeName, err)
}
t.Logf("The Antrea Pod for Node '%s' is '%s'", nodeName, antreaPodName)
return antreaPodName
}

antreaGWName, err := data.GetGatewayInterfaceName(antreaNamespace)
if err != nil {
t.Fatalf("Failed to detect gateway interface name from ConfigMap: %v", err)
}

getGatewayRoutes := func() (routes []Route, err error) {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "list", "dev", antreaGWName}
} else {
cmd = []string{"ip", "-6", "route", "list", "dev", antreaGWName}
}
podName := antreaPodName()
stdout, stderr, err := data.runCommandFromPod(antreaNamespace, podName, agentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("error when running ip command in Pod '%s': %v - stdout: %s - stderr: %s", podName, err, stdout, stderr)
}
re := regexp.MustCompile(`([^\s]+) via ([^\s]+)`)
for _, line := range strings.Split(stdout, "\n") {
var err error
matches := re.FindStringSubmatch(line)
if len(matches) == 0 {
continue
}
route := Route{}
if _, route.peerPodCIDR, err = net.ParseCIDR(matches[1]); err != nil {
return nil, fmt.Errorf("%s is not a valid net CIDR", matches[1])
}
if route.peerPodGW = net.ParseIP(matches[2]); route.peerPodGW == nil {
return nil, fmt.Errorf("%s is not a valid IP", matches[2])
}
routes = append(routes, route)
}
return routes, nil
}

expectedRtNumMin, expectedRtNumMax := clusterInfo.numNodes-1, clusterInfo.numNodes-1
if encapMode == config.TrafficEncapModeNoEncap {
Expand All @@ -376,7 +327,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
t.Logf("Retrieving gateway routes on Node '%s'", nodeName)
var routes []Route
if err := wait.PollImmediate(defaultInterval, defaultTimeout, func() (found bool, err error) {
routes, err = getGatewayRoutes()
routes, err = getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -410,51 +361,20 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
routeToAdd.peerPodGW = net.ParseIP("fe80::1")
}

// We run the ip command from the antrea-agent container for delete / add since they need to
// be run as root and the antrea-agent container is privileged. If we used RunCommandOnNode,
// we may need to use "sudo" for some providers (e.g. vagrant).
deleteGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "del", route.peerPodCIDR.String()}
} else {
cmd = []string{"ip", "-6", "route", "del", route.peerPodCIDR.String()}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

addGatewayRoute := func(route *Route) error {
var cmd []string
if !isIPv6 {
cmd = []string{"ip", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
} else {
cmd = []string{"ip", "-6", "route", "add", route.peerPodCIDR.String(), "via", route.peerPodGW.String(), "dev", antreaGWName, "onlink"}
}
_, _, err := data.runCommandFromPod(antreaNamespace, antreaPodName(), agentContainerName, cmd)
if err != nil {
return fmt.Errorf("error when running ip command on Node '%s': %v", nodeName, err)
}
return nil
}

if routeToDelete != nil {
t.Logf("Deleting one actual gateway route and adding a dummy one")
if err := deleteGatewayRoute(routeToDelete); err != nil {
if err := deleteGatewayRoute(routeToDelete, data, isIPv6); err != nil {
t.Fatalf("Error when deleting route: %v", err)
}
}
if err := addGatewayRoute(routeToAdd); err != nil {
if err := addGatewayRoute(routeToAdd, data, isIPv6); err != nil {
t.Fatalf("Error when adding dummy route route: %v", err)
}
defer func() {
// Cleanup the dummy route regardless of whether the test was a success or a
// failure; ignore error (there will be an error if the test is a success since the
// dummy route will no longer exist).
_ = deleteGatewayRoute(routeToAdd)
_ = deleteGatewayRoute(routeToAdd, data, isIPv6)
}()

t.Logf("Restarting antrea-agent on Node '%s'", nodeName)
Expand All @@ -470,7 +390,7 @@ func testReconcileGatewayRoutesOnStartup(t *testing.T, data *TestData, isIPv6 bo
// We expect the agent to delete the extra route we added and add back the route we deleted
t.Logf("Waiting for gateway routes to converge")
if err := wait.Poll(defaultInterval, defaultTimeout, func() (bool, error) {
newRoutes, err := getGatewayRoutes()
newRoutes, err := getGatewayRoutes(data, isIPv6)
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 784eb16

Please sign in to comment.