Skip to content

Commit

Permalink
[IPv6] Implement L3 connectivity for IPv6 traffic (#1011)
Browse files Browse the repository at this point in the history
1. Use IPv6 in iptables and ipset configuration.
2. Identifiy IPv6 address and configure in OpenFlow.
3. Use Node Internal address for tunnel.
  • Loading branch information
wenyingd authored and lzhecheng committed Nov 9, 2020
1 parent 74dca3b commit 89ab17b
Show file tree
Hide file tree
Showing 15 changed files with 571 additions and 166 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ docker-test-integration: .coverage
-v $(DOCKER_CACHE)/gocache:/tmp/gocache \
-v $(CURDIR)/.coverage:/usr/src/github.com/vmware-tanzu/antrea/.coverage \
-v $(CURDIR):/usr/src/github.com/vmware-tanzu/antrea:ro \
-v /lib/modules:/lib/modules \
antrea/test test-integration $(USERID) $(GRPID)

.PHONY: docker-tidy
Expand Down
80 changes: 50 additions & 30 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,11 @@ func (c *Controller) removeStaleGatewayRoutes() error {
// running, so the route to local Pods will be desired as well.
var desiredPodCIDRs []string
for _, node := range nodes {
// PodCIDR is allocated by K8s NodeIpamController asynchronously so it's possible we see a Node
// with no PodCIDR set when it just joins the cluster.
if node.Spec.PodCIDR == "" {
podCIDRs := getPodCIDRsOnNode(node)
if len(podCIDRs) == 0 {
continue
}
desiredPodCIDRs = append(desiredPodCIDRs, node.Spec.PodCIDR)
desiredPodCIDRs = append(desiredPodCIDRs, podCIDRs...)
}

// routeClient will remove orphaned routes whose destinations are not in desiredPodCIDRs.
Expand Down Expand Up @@ -360,16 +359,17 @@ func (c *Controller) syncNodeRoute(nodeName string) error {
func (c *Controller) deleteNodeRoute(nodeName string) error {
klog.Infof("Deleting routes and flows to Node %s", nodeName)

obj, installed := c.installedNodes.Load(nodeName)
podCIDRs, installed := c.installedNodes.Load(nodeName)
if !installed {
// Route is not added for this Node.
return nil
}
nodeRouteInfo := obj.(*nodeRouteInfo)
if err := c.routeClient.DeleteRoutes(nodeRouteInfo.podCIDR); err != nil {
return fmt.Errorf("failed to delete the route to Node %s: %v", nodeName, err)
}

for _, podCIDR := range podCIDRs.([]*net.IPNet) {
if err := c.routeClient.DeleteRoutes(podCIDR); err != nil {
return fmt.Errorf("failed to delete the route to Node %s: %v", nodeName, err)
}
}
if err := c.ofClient.UninstallNodeFlows(nodeName); err != nil {
return fmt.Errorf("failed to uninstall flows to Node %s: %v", nodeName, err)
}
Expand Down Expand Up @@ -397,25 +397,20 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
return nil
}

klog.Infof("Adding routes and flows to Node %s, podCIDR: %s, addresses: %v",
nodeName, node.Spec.PodCIDR, node.Status.Addresses)

if node.Spec.PodCIDR == "" {
klog.Errorf("PodCIDR is empty for Node %s", nodeName)
// Does not help to return an error and trigger controller retries.
return nil
}
peerPodCIDRAddr, peerPodCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
if err != nil {
klog.Errorf("Failed to parse PodCIDR %s for Node %s", node.Spec.PodCIDR, nodeName)
podCIDRStrs := getPodCIDRsOnNode(node)
if len(podCIDRStrs) == 0 {
// If no valid PodCIDR is configured in Node.Spec, return immediately.
return nil
}

klog.Infof("Adding routes and flows to Node %s, podCIDRs: %v, addresses: %v",
nodeName, podCIDRStrs, node.Status.Addresses)

peerNodeIP, err := GetNodeAddr(node)
if err != nil {
klog.Errorf("Failed to retrieve IP address of Node %s: %v", nodeName, err)
return nil
}
peerGatewayIP := ip.NextIP(peerPodCIDRAddr)

ipsecTunOFPort := int32(0)
if c.networkConfig.EnableIPSecTunnel {
Expand All @@ -427,29 +422,51 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
}
}

var podCIDRs []*net.IPNet
peerConfig := make(map[*net.IPNet]net.IP, len(podCIDRStrs))
for _, podCIDR := range podCIDRStrs {
peerPodCIDRAddr, peerPodCIDR, err := net.ParseCIDR(podCIDR)
if err != nil {
klog.Errorf("Failed to parse PodCIDR %s for Node %s", podCIDR, nodeName)
return nil
}
peerGatewayIP := ip.NextIP(peerPodCIDRAddr)
peerConfig[peerPodCIDR] = peerGatewayIP
podCIDRs = append(podCIDRs, peerPodCIDR)
}
err = c.ofClient.InstallNodeFlows(
nodeName,
c.nodeConfig.GatewayConfig.MAC,
*peerPodCIDR,
peerGatewayIP,
peerConfig,
peerNodeIP,
config.DefaultTunOFPort,
uint32(ipsecTunOFPort))
if err != nil {
return fmt.Errorf("failed to install flows to Node %s: %v", nodeName, err)
}

if err := c.routeClient.AddRoutes(peerPodCIDR, peerNodeIP, peerGatewayIP); err != nil {
return err
for peerPodCIDR, peerGatewayIP := range peerConfig {
if err := c.routeClient.AddRoutes(peerPodCIDR, peerNodeIP, peerGatewayIP); err != nil {
return err
}
}
c.installedNodes.Store(nodeName, &nodeRouteInfo{
podCIDR: peerPodCIDR,
nodeIP: peerNodeIP,
gatewayIP: peerGatewayIP,
})
c.installedNodes.Store(nodeName, podCIDRs)
return err
}

func getPodCIDRsOnNode(node *corev1.Node) []string {
if node.Spec.PodCIDRs != nil {
return node.Spec.PodCIDRs
}

if node.Spec.PodCIDR == "" {
klog.Errorf("PodCIDR is empty for Node %s", node.Name)
// Does not help to return an error and trigger controller retries.
return nil
}
return []string{node.Spec.PodCIDR}
}

// createIPSecTunnelPort creates an IPSec tunnel port for the remote Node if the
// tunnel does not exist, and returns the ofport number.
func (c *Controller) createIPSecTunnelPort(nodeName string, nodeIP net.IP) (int32, error) {
Expand Down Expand Up @@ -535,6 +552,9 @@ func ParseTunnelInterfaceConfig(

// GetNodeAddr gets the available IP address of a Node. GetNodeAddr will first try to get the
// NodeInternalIP, then try to get the NodeExternalIP.
// Note: Although K8s supports dual-stack, there is only a single Internal address per Node because of issue (
// kubernetes/kubernetes#91940 ). The Node might have multiple addresses after the issue is fixed, and one per address
// family. And we should change the return type at that time.
func GetNodeAddr(node *corev1.Node) (net.IP, error) {
addresses := make(map[corev1.NodeAddressType]string)
for _, addr := range node.Status.Addresses {
Expand Down
29 changes: 18 additions & 11 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ type Client interface {
InstallNodeFlows(
hostname string,
localGatewayMAC net.HardwareAddr,
peerPodCIDR net.IPNet,
peerGatewayIP, tunnelPeerIP net.IP,
peerConfigs map[*net.IPNet]net.IP,
tunnelPeerIP net.IP,
tunOFPort, ipsecTunOFPort uint32) error

// UninstallNodeFlows removes the connection to the remote Node specified with the
Expand Down Expand Up @@ -289,20 +289,27 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro

func (c *client) InstallNodeFlows(hostname string,
localGatewayMAC net.HardwareAddr,
peerPodCIDR net.IPNet,
peerGatewayIP, tunnelPeerIP net.IP,
peerConfigs map[*net.IPNet]net.IP,
tunnelPeerIP net.IP,
tunOFPort, ipsecTunOFPort uint32) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

flows := []binding.Flow{
c.arpResponderFlow(peerGatewayIP, cookie.Node),
}
if c.encapMode.NeedsEncapToPeer(tunnelPeerIP, c.nodeConfig.NodeIPAddr) {
flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, peerPodCIDR, tunnelPeerIP, tunOFPort, cookie.Node))
} else {
flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, peerPodCIDR, cookie.Node))
var flows []binding.Flow

for peerPodCIDR, peerGatewayIP := range peerConfigs {
if peerGatewayIP.To4() != nil {
// Since broadcast is not supported in IPv6, ARP should happen only with IPv4 address, and ARP responder flows
// only work for IPv4 addresses.
flows = append(flows, c.arpResponderFlow(peerGatewayIP, cookie.Node))
}
if c.encapMode.NeedsEncapToPeer(tunnelPeerIP, c.nodeConfig.NodeIPAddr) {
flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, *peerPodCIDR, tunnelPeerIP, tunOFPort, cookie.Node))
} else {
flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, *peerPodCIDR, cookie.Node))
}
}

if ipsecTunOFPort != 0 {
// When IPSec tunnel is enabled, packets received from the remote Node are
// input from the Node's IPSec tunnel port, not the default tunnel port. So,
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,12 @@ var bridgeMgmtAddr = ofconfig.GetMgmtAddress(ovsconfig.DefaultOVSRunDir, bridgeN
func installNodeFlows(ofClient Client, cacheKey string) (int, error) {
hostName := cacheKey
gwMAC, _ := net.ParseMAC("AA:BB:CC:DD:EE:FF")
gwIP, IPNet, _ := net.ParseCIDR("10.0.1.1/24")
gwIP, ipNet, _ := net.ParseCIDR("10.0.1.1/24")
peerNodeIP := net.ParseIP("192.168.1.1")
err := ofClient.InstallNodeFlows(hostName, gwMAC, *IPNet, gwIP, peerNodeIP, config.DefaultTunOFPort, 0)
peerConfig := map[*net.IPNet]net.IP{
ipNet: gwIP,
}
err := ofClient.InstallNodeFlows(hostName, gwMAC, peerConfig, peerNodeIP, config.DefaultTunOFPort, 0)
client := ofClient.(*client)
fCacheI, ok := client.nodeFlowCache.Load(hostName)
if ok {
Expand Down
6 changes: 4 additions & 2 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,8 @@ func (c *client) l3FwdFlowToRemote(
tunnelPeer net.IP,
tunOFPort uint32,
category cookie.Category) binding.Flow {
return c.pipeline[l3ForwardingTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP).
ipProto := parseIPProtocol(peerSubnet.IP)
return c.pipeline[l3ForwardingTable].BuildFlow(priorityNormal).MatchProtocol(ipProto).
MatchDstIPNet(peerSubnet).
Action().DecTTL().
// Rewrite src MAC to local gateway MAC and rewrite dst MAC to virtual MAC.
Expand All @@ -851,8 +852,9 @@ func (c *client) l3FwdFlowToRemoteViaGW(
localGatewayMAC net.HardwareAddr,
peerSubnet net.IPNet,
category cookie.Category) binding.Flow {
ipProto := parseIPProtocol(peerSubnet.IP)
l3FwdTable := c.pipeline[l3ForwardingTable]
return l3FwdTable.BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP).
return l3FwdTable.BuildFlow(priorityNormal).MatchProtocol(ipProto).
MatchDstIPNet(peerSubnet).
Action().DecTTL().
Action().SetDstMAC(localGatewayMAC).
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/agent/route/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Interface interface {
// It should be idempotent and can be safely called on every startup.
Initialize(nodeConfig *config.NodeConfig) error

// Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs.
// Reconcile should remove orphaned routes and related configuration based on the desired podCIDRs. If IPv6 is enabled
// in the cluster, Reconcile should also remove the orphaned IPv6 neighbors.
Reconcile(podCIDRs []string) error

// AddRoutes should add routes to the provided podCIDR.
Expand Down
Loading

0 comments on commit 89ab17b

Please sign in to comment.