Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify the status of required routes gateway periodically #2091

Merged
merged 1 commit into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func run(o *Options) error {
// networkReadyCh is used to notify that the Node's network is ready.
// Functions that rely on the Node's network should wait for the channel to close.
networkReadyCh := make(chan struct{})
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()
// Initialize agent and node network.
agentInitializer := agent.NewInitializer(
k8sClient,
Expand All @@ -141,6 +145,7 @@ func run(o *Options) error {
serviceCIDRNetv6,
networkConfig,
networkReadyCh,
stopCh,
features.DefaultFeatureGate.Enabled(features.AntreaProxy))
err = agentInitializer.Initialize()
if err != nil {
Expand Down Expand Up @@ -256,10 +261,6 @@ func run(o *Options) error {
if err := antreaClientProvider.RunOnce(); err != nil {
return err
}
// set up signal capture: the first SIGTERM / SIGINT signal is handled gracefully and will
// cause the stopCh channel to be closed; if another signal is received before the program
// exits, we will force exit.
stopCh := signals.RegisterSignalHandlers()
tnqn marked this conversation as resolved.
Show resolved Hide resolved

// Start the NPL agent.
if features.DefaultFeatureGate.Enabled(features.NodePortLocal) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Initializer struct {
// networkReadyCh should be closed once the Node's network is ready.
// The CNI server will wait for it before handling any CNI Add requests.
networkReadyCh chan<- struct{}
stopCh <-chan struct{}
}

func NewInitializer(
Expand All @@ -86,6 +87,7 @@ func NewInitializer(
serviceCIDRv6 *net.IPNet,
networkConfig *config.NetworkConfig,
networkReadyCh chan<- struct{},
stopCh <-chan struct{},
enableProxy bool) *Initializer {
return &Initializer{
ovsBridgeClient: ovsBridgeClient,
Expand All @@ -100,6 +102,7 @@ func NewInitializer(
serviceCIDRv6: serviceCIDRv6,
networkConfig: networkConfig,
networkReadyCh: networkReadyCh,
stopCh: stopCh,
enableProxy: enableProxy,
}
}
Expand Down Expand Up @@ -845,6 +848,13 @@ func (i *Initializer) allocateGatewayAddresses(localSubnets []*net.IPNet, gatewa
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
return err
}
// Periodically check whether IP configuration of the gateway is correct.
// Terminate when stopCh is closed.
go wait.Until(func() {
if err := util.ConfigureLinkAddresses(i.nodeConfig.GatewayConfig.LinkIndex, gwIPs); err != nil {
klog.Errorf("Failed to check IP configuration of the gateway: %v", err)
}
}, 60*time.Second, i.stopCh)

for _, gwIP := range gwIPs {
if gwIP.IP.To4() != nil {
Expand Down
47 changes: 45 additions & 2 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,49 @@ func (c *Client) syncIPInfra() {
klog.Errorf("Failed to sync iptables: %v", err)
return
}
klog.V(3).Infof("Successfully synced node iptables")
if err := c.syncRoutes(); err != nil {
klog.Errorf("Failed to sync routes: %v", err)
}
klog.V(3).Infof("Successfully synced node iptables and routes")
}

func (c *Client) syncRoutes() error {
routeList, err := netlink.RouteList(nil, netlink.FAMILY_ALL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be race condition between syncRoutes and DeleteRoutes:
if DeleteRoutes has deleted actual routes and is about to delete the desired routes, syncRoutes may see that there are desired rules but no actual routes and add the routes back. Then the routes will going to be stale.

if err != nil {
return err
}
routeMap := make(map[string]*netlink.Route)
for i := range routeList {
r := &routeList[i]
if r.Dst == nil {
continue
}
routeMap[r.Dst.String()] = r
}
c.nodeRoutes.Range(func(_, v interface{}) bool {
for _, route := range v.([]*netlink.Route) {
r, ok := routeMap[route.Dst.String()]
if ok && routeEqual(route, r) {
continue
}
if err := netlink.RouteReplace(route); err != nil {
klog.Errorf("Failed to add route to the gateway: %v", err)
return false
}
}
return true
})
return nil
}

func routeEqual(x, y *netlink.Route) bool {
if x == nil || y == nil {
return false
}
return x.LinkIndex == y.LinkIndex &&
x.Dst.IP.Equal(y.Dst.IP) &&
bytes.Equal(x.Dst.Mask, y.Dst.Mask) &&
x.Gw.Equal(y.Gw)
}

// syncIPSet ensures that the required ipset exists and it has the initial members.
Expand Down Expand Up @@ -578,13 +620,14 @@ func (c *Client) DeleteRoutes(podCIDR *net.IPNet) error {

routes, exists := c.nodeRoutes.Load(podCIDRStr)
if exists {
c.nodeRoutes.Delete(podCIDRStr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would introduce new issue that the routes won't be removed if the first attempt fails.
I think it can add the remaining routes back to the cache when it fails, so it can retry to remove them later.

for _, r := range routes.([]*netlink.Route) {
klog.V(4).Infof("Deleting route %v", r)
if err := netlink.RouteDel(r); err != nil && err != unix.ESRCH {
c.nodeRoutes.Store(podCIDRStr, routes)
return err
}
}
c.nodeRoutes.Delete(podCIDRStr)
}
if podCIDR.IP.To4() == nil {
neigh, exists := c.nodeNeighbors.Load(podCIDRStr)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/util/net_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetNetLink(dev string) netlink.Link {
return link
}

// GetPeerLinkBridge returns peer device and its attached bridge (if applicable)
// GetNSPeerDevBridge returns peer device and its attached bridge (if applicable)
// for device dev in network space indicated by nsPath
func GetNSPeerDevBridge(nsPath, dev string) (*net.Interface, string, error) {
var peerIdx int
Expand Down
14 changes: 14 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package e2e

import (
Expand Down
57 changes: 55 additions & 2 deletions test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func createDummyGW(t *testing.T) netlink.Link {

func skipIfNotInContainer(t *testing.T) {
if _, incontainer := os.LookupEnv("INCONTAINER"); !incontainer {
// test changes file system, routing table. Run in contain only
// test changes file system, routing table. Run in container only
t.Skipf("Skipping test which is run only in container")
}
}
Expand Down Expand Up @@ -380,13 +380,66 @@ func TestAddAndDeleteRoutes(t *testing.T) {
output, err := ExecOutputTrim(fmt.Sprintf("ip route show table 0 exact %s", peerCIDR))
assert.NoError(t, err)
assert.Equal(t, "", output, "expected no routes to %s", peerCIDR)

entries, err = ipset.ListEntries("ANTREA-POD-IP")
assert.NoError(t, err, "list ipset entries failed")
assert.NotContains(t, entries, tc.peerCIDR, "entry should not be in ipset")
}
}

func TestSyncRoutes(t *testing.T) {
skipIfNotInContainer(t)
gwLink := createDummyGW(t)
defer netlink.LinkDel(gwLink)

tcs := []struct {
// variations
mode config.TrafficEncapModeType
nodeName string
peerCIDR string
peerIP net.IP
// expectations
uplink netlink.Link // indicates outbound of the route.
}{
{mode: config.TrafficEncapModeEncap, nodeName: "node0", peerCIDR: "10.10.20.0/24", peerIP: localPeerIP, uplink: gwLink},
{mode: config.TrafficEncapModeNoEncap, nodeName: "node1", peerCIDR: "10.10.30.0/24", peerIP: localPeerIP, uplink: nodeLink},
{mode: config.TrafficEncapModeNoEncap, nodeName: "node2", peerCIDR: "10.10.40.0/24", peerIP: remotePeerIP, uplink: nil},
{mode: config.TrafficEncapModeHybrid, nodeName: "node3", peerCIDR: "10.10.50.0/24", peerIP: localPeerIP, uplink: nodeLink},
{mode: config.TrafficEncapModeHybrid, nodeName: "node4", peerCIDR: "10.10.60.0/24", peerIP: remotePeerIP, uplink: gwLink},
}

for _, tc := range tcs {
t.Logf("Running test with mode %s peer cidr %s peer ip %s node config %s", tc.mode, tc.peerCIDR, tc.peerIP, nodeConfig)
routeClient, err := route.NewClient(serviceCIDR, &config.NetworkConfig{TrafficEncapMode: tc.mode}, false)
assert.NoError(t, err)
err = routeClient.Initialize(nodeConfig, func() {})
assert.NoError(t, err)

_, peerCIDR, _ := net.ParseCIDR(tc.peerCIDR)
nhCIDRIP := ip.NextIP(peerCIDR.IP)
assert.NoError(t, routeClient.AddRoutes(peerCIDR, tc.nodeName, tc.peerIP, nhCIDRIP), "adding routes failed")

listCmd := fmt.Sprintf("ip route show table 0 exact %s", peerCIDR)
expOutput, err := exec.Command("bash", "-c", listCmd).Output()
assert.NoError(t, err, "error executing ip route command: %s", listCmd)

if len(expOutput) > 0 {
delCmd := fmt.Sprintf("ip route del %s", peerCIDR.String())
_, err = exec.Command("bash", "-c", delCmd).Output()
assert.NoError(t, err, "error executing ip route command: %s", delCmd)
}

stopCh := make(chan struct{})
defer close(stopCh)
route.IPTablesSyncInterval = 2 * time.Second
go routeClient.Run(stopCh)
time.Sleep(route.IPTablesSyncInterval) // wait for one iteration of sync operation.

output, err := exec.Command("bash", "-c", listCmd).Output()
assert.NoError(t, err, "error executing ip route command: %s", listCmd)
assert.Equal(t, expOutput, output, "error syncing route")
}
}

func TestReconcile(t *testing.T) {
skipIfNotInContainer(t)

Expand Down