Skip to content

Commit

Permalink
[Windows] NoEncap support (#2160)
Browse files Browse the repository at this point in the history
This PR implements support for Windows Noencap mode. It also includes flow optimization to improve
Pod to Pod performance.
Noencap: If same subnet, destination MAC in OVS is set with peer Node annotation. If not, host routing will
be used to forward traffic.
Hybrid: If same subnet, same as Noencap with same subnet. If not, same as Encap mode.

Supersedes #1901
Fixes #1632

Signed-off-by: Zhecheng Li <[email protected]>
  • Loading branch information
lzhecheng authored May 27, 2021
1 parent b8e9157 commit 9547582
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 70 deletions.
14 changes: 12 additions & 2 deletions build/yamls/antrea-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ data:
# Enable TLS communication from flow exporter to flow aggregator.
#enableTLSToFlowAggregator: true
# Determines how traffic is encapsulated. It has the following options:
# encap(default): Inter-node Pod traffic is always encapsulated and Pod to external network
# traffic is SNAT'd.
# noEncap: Inter-node Pod traffic is not encapsulated; Pod to external network traffic is
# SNAT'd if noSNAT is not set to true. Underlying network must be capable of
# supporting Pod traffic across IP subnets.
# hybrid: noEncap if source and destination Nodes are on the same subnet, otherwise encap.
#
#trafficEncapMode: encap
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand All @@ -104,7 +114,7 @@ kind: ConfigMap
metadata:
labels:
app: antrea
name: antrea-windows-config-6cmd972m6b
name: antrea-windows-config-g5hmmkbh55
namespace: kube-system
---
apiVersion: apps/v1
Expand Down Expand Up @@ -192,7 +202,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-windows-config-6cmd972m6b
name: antrea-windows-config-g5hmmkbh55
name: antrea-windows-config
- configMap:
defaultMode: 420
Expand Down
10 changes: 10 additions & 0 deletions build/yamls/windows/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,13 @@ featureGates:

# Enable TLS communication from flow exporter to flow aggregator.
#enableTLSToFlowAggregator: true

# Determines how traffic is encapsulated. It has the following options:
# encap(default): Inter-node Pod traffic is always encapsulated and Pod to external network
# traffic is SNAT'd.
# noEncap: Inter-node Pod traffic is not encapsulated; Pod to external network traffic is
# SNAT'd if noSNAT is not set to true. Underlying network must be capable of
# supporting Pod traffic across IP subnets.
# hybrid: noEncap if source and destination Nodes are on the same subnet, otherwise encap.
#
#trafficEncapMode: encap
2 changes: 1 addition & 1 deletion cmd/antrea-agent/options_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (o *Options) checkUnsupportedFeatures() error {
unsupported = append(unsupported, "OVSDatapathType: "+o.config.OVSDatapathType)
}
_, encapMode := config.GetTrafficEncapModeFromStr(o.config.TrafficEncapMode)
if encapMode != config.TrafficEncapModeEncap {
if encapMode == config.TrafficEncapModeNetworkPolicyOnly {
unsupported = append(unsupported, "TrafficEncapMode: "+encapMode.String())
}
if o.config.TunnelType == ovsconfig.GRETunnel {
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-agent/options_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestCheckUnsupportedFeatures(t *testing.T) {
{
"noEncap mode",
AgentConfig{TrafficEncapMode: config.TrafficEncapModeNoEncap.String()},
false,
true,
},
{
"GRE tunnel",
Expand Down
40 changes: 34 additions & 6 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
utilip "antrea.io/antrea/pkg/util/ip"
Expand Down Expand Up @@ -136,6 +137,7 @@ type nodeRouteInfo struct {
podCIDRs []*net.IPNet
nodeIP net.IP
gatewayIP []net.IP
nodeMAC net.HardwareAddr
}

// enqueueNode adds an object to the controller work queue
Expand Down Expand Up @@ -407,8 +409,16 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
}

func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
if _, installed, _ := c.installedNodes.GetByKey(nodeName); installed {
// Route is already added for this Node.
// It is only for Windows Noencap mode to get Node MAC.
peerNodeMAC, err := getNodeMAC(node)
if err != nil {
return fmt.Errorf("error when retrieving MAC of Node %s: %v", nodeName, err)
}

nrInfo, installed, _ := c.installedNodes.GetByKey(nodeName)

if installed && nrInfo.(*nodeRouteInfo).nodeMAC.String() == peerNodeMAC.String() {
// Route is already added for this Node and Node MAC isn't changed.
return nil
}

Expand All @@ -432,7 +442,7 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
return nil
}

nodesHaveSamePodCIDR, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, podCIDR)
nodesHaveSamePodCIDR, _ := c.installedNodes.IndexKeys(nodeRouteInfoPodCIDRIndexName, podCIDR)
// PodCIDRs can be released from deleted Nodes and allocated to new Nodes. For server side, it won't happen that a
// PodCIDR is allocated to more than one Node at any point. However, for client side, if a resync happens to occur
// when there are Node creation and deletion events, the informer will generate the events in a way that all
Expand All @@ -442,9 +452,12 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
// stale routes, flows, and relevant cache of this podCIDR are removed appropriately, we wait for the Node deletion
// event to be processed before proceeding, or the route installation and uninstallation operations may override or
// conflict with each other.
if len(nodesHaveSamePodCIDR) > 0 {
// For Windows Noencap case, it is possible that nodesHaveSamePodCIDR is the Node itself because the Node
// MAC annotation was not set yet when the Node was initially installed. Then it is processed for the second
// time when its MAC annotation is updated.
if len(nodesHaveSamePodCIDR) > 0 && (len(nodesHaveSamePodCIDR) != 1 || nodesHaveSamePodCIDR[0] != nodeName) {
// Return an error so that the Node will be put back to the workqueue and will be retried later.
return fmt.Errorf("skipping addNodeRoute for Node %s because podCIDR %s is duplicate with Node %s, will retry later", nodeName, podCIDR, nodesHaveSamePodCIDR[0].(*nodeRouteInfo).nodeName)
return fmt.Errorf("skipping addNodeRoute for Node %s because podCIDR %s is duplicate with Node %s, will retry later", nodeName, podCIDR, nodesHaveSamePodCIDR[0])
}

peerPodCIDRAddr, peerPodCIDR, err := net.ParseCIDR(podCIDR)
Expand Down Expand Up @@ -477,7 +490,8 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
nodeName,
peerConfig,
peerNodeIP,
uint32(ipsecTunOFPort))
uint32(ipsecTunOFPort),
peerNodeMAC)
if err != nil {
return fmt.Errorf("failed to install flows to Node %s: %v", nodeName, err)
}
Expand All @@ -494,6 +508,7 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
podCIDRs: podCIDRs,
nodeIP: peerNodeIP,
gatewayIP: peerGatewayIPs,
nodeMAC: peerNodeMAC,
})
return err
}
Expand Down Expand Up @@ -629,3 +644,16 @@ func (c *Controller) IPInPodSubnets(ip net.IP) bool {
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
}

// getNodeMAC gets Node's br-int MAC from its annotation. It is only for Windows Noencap mode.
func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) {
macStr := node.Annotations[types.NodeMACAddressAnnotationKey]
if macStr == "" {
return nil, nil
}
mac, err := net.ParseMAC(macStr)
if err != nil {
return nil, fmt.Errorf("failed to parse MAC `%s`: %v", macStr, err)
}
return mac, nil
}
8 changes: 4 additions & 4 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.processNextWorkItem()

Expand All @@ -150,7 +150,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
// After node1 is deleted, routes and flows should be installed for node2 successfully.
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node2", nodeIP2, podCIDRGateway).Times(1)
c.processNextWorkItem()
}()
Expand Down Expand Up @@ -214,12 +214,12 @@ func TestIPInPodSubnets(t *testing.T) {
c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.processNextWorkItem()

c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1)
c.processNextWorkItem()

Expand Down
57 changes: 49 additions & 8 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type Client interface {
hostname string,
peerConfigs map[*net.IPNet]net.IP,
tunnelPeerIP net.IP,
ipsecTunOFPort uint32) error
ipsecTunOFPort uint32,
peerNodeMAC net.HardwareAddr) error

// UninstallNodeFlows removes the connection to the remote Node specified with the
// hostname. UninstallNodeFlows will do nothing if no connection to the host was established.
Expand Down Expand Up @@ -312,6 +313,45 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return nil
}

// modifyFlows sets the flows of flowCategoryCache be exactly same as the provided slice for the given flowCacheKey.
func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
oldFlowCacheI, ok := cache.Load(flowCacheKey)
fCache := flowCache{}
var err error
if !ok {
for _, flow := range flows {
fCache[flow.MatchString()] = flow
}

err = c.ofEntryOperations.AddAll(flows)
} else {
var adds, mods, dels []binding.Flow
oldFlowCache := oldFlowCacheI.(flowCache)
for _, flow := range flows {
matchString := flow.MatchString()
if _, ok := oldFlowCache[matchString]; ok {
mods = append(mods, flow)
} else {
adds = append(adds, flow)
}
fCache[matchString] = flow
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
dels = append(dels, v)
}
}
err = c.ofEntryOperations.BundleOps(adds, mods, dels)
}
if err != nil {
return err
}

// Modify the flows in the flow cache.
cache.Store(flowCacheKey, fCache)
return nil
}

// deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey.
func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error {
fCacheI, ok := cache.Load(flowCacheKey)
Expand All @@ -332,10 +372,12 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
return nil
}

// InstallNodeFlows installs flows for peer Nodes. Parameter remoteGatewayMAC is only for Windows.
func (c *client) InstallNodeFlows(hostname string,
peerConfigs map[*net.IPNet]net.IP,
tunnelPeerIP net.IP,
ipsecTunOFPort uint32) error {
ipsecTunOFPort uint32,
remoteGatewayMAC net.HardwareAddr) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

Expand All @@ -353,7 +395,7 @@ func (c *client) InstallNodeFlows(hostname string,
// IPv6 one is decided by the address family of Node Internal Address.
flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, *peerPodCIDR, tunnelPeerIP, cookie.Node))
} else {
flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, *peerPodCIDR, cookie.Node))
flows = append(flows, c.l3FwdFlowToRemoteViaRouting(localGatewayMAC, remoteGatewayMAC, cookie.Node, tunnelPeerIP, peerPodCIDR)...)
}
}

Expand All @@ -364,7 +406,9 @@ func (c *client) InstallNodeFlows(hostname string,
flows = append(flows, c.tunnelClassifierFlow(ipsecTunOFPort, cookie.Node))
}

return c.addFlows(c.nodeFlowCache, hostname, flows)
// For Windows Noencap Mode, the OVS flows for Node need be be exactly same as the provided 'flows' slice because
// the Node flows may be processed more than once if the MAC annotation is updated.
return c.modifyFlows(c.nodeFlowCache, hostname, flows)
}

func (c *client) UninstallNodeFlows(hostname string) error {
Expand Down Expand Up @@ -576,10 +620,7 @@ func (c *client) InstallGatewayFlows() error {
// Add flow to ensure the liveness check packet could be forwarded correctly.
flows = append(flows, c.localProbeFlow(gatewayIPs, cookie.Default)...)
flows = append(flows, c.ctRewriteDstMACFlows(gatewayConfig.MAC, cookie.Default)...)
// In NoEncap , no traffic from tunnel port
if c.encapMode.SupportsEncap() {
flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...)
}
flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...)

if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func installNodeFlows(ofClient Client, cacheKey string) (int, error) {
peerConfig := map[*net.IPNet]net.IP{
ipNet: gwIP,
}
err := ofClient.InstallNodeFlows(hostName, peerConfig, peerNodeIP, 0)
err := ofClient.InstallNodeFlows(hostName, peerConfig, peerNodeIP, 0, nil)
client := ofClient.(*client)
fCacheI, ok := client.nodeFlowCache.Load(hostName)
if ok {
Expand Down Expand Up @@ -90,7 +90,6 @@ func TestIdempotentFlowInstallation(t *testing.T) {
numFlows int
installFn func(ofClient Client, cacheKey string) (int, error)
}{
{"NodeFlows", "host", 2, installNodeFlows},
{"PodFlows", "aaaa-bbbb-cccc-dddd", 5, installPodFlows},
}

Expand Down
Loading

0 comments on commit 9547582

Please sign in to comment.