Skip to content

Commit

Permalink
[Windows] NoEncap support
Browse files Browse the repository at this point in the history
Co-authored-by: Zhecheng Li <[email protected]>
  • Loading branch information
lzhecheng committed May 19, 2021
1 parent 75bfa56 commit 6933039
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 38 deletions.
14 changes: 12 additions & 2 deletions build/yamls/antrea-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ data:
# the flow collector.
# Flow export frequency should be greater than or equal to 1.
#flowExportFrequency: 12
# Determines how traffic is encapsulated. It has the following options:
# encap(default): Inter-node Pod traffic is always encapsulated and Pod to external network
# traffic is SNAT'd.
# noEncap: Inter-node Pod traffic is not encapsulated; Pod to external network traffic is
# SNAT'd if noSNAT is not set to true. Underlying network must be capable of
# supporting Pod traffic across IP subnets.
# hybrid: noEncap if source and destination Nodes are on the same subnet, otherwise encap.
#
#trafficEncapMode: encap
antrea-cni.conflist: |
{
"cniVersion":"0.3.0",
Expand All @@ -89,7 +99,7 @@ kind: ConfigMap
metadata:
labels:
app: antrea
name: antrea-windows-config-kc6bfhk4mg
name: antrea-windows-config-4kcddbh9tt
namespace: kube-system
---
apiVersion: apps/v1
Expand Down Expand Up @@ -177,7 +187,7 @@ spec:
operator: Exists
volumes:
- configMap:
name: antrea-windows-config-kc6bfhk4mg
name: antrea-windows-config-4kcddbh9tt
name: antrea-windows-config
- configMap:
defaultMode: 420
Expand Down
10 changes: 10 additions & 0 deletions build/yamls/windows/base/conf/antrea-agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ featureGates:
# the flow collector.
# Flow export frequency should be greater than or equal to 1.
#flowExportFrequency: 12

# Determines how traffic is encapsulated. It has the following options:
# encap(default): Inter-node Pod traffic is always encapsulated and Pod to external network
# traffic is SNAT'd.
# noEncap: Inter-node Pod traffic is not encapsulated; Pod to external network traffic is
# SNAT'd if noSNAT is not set to true. Underlying network must be capable of
# supporting Pod traffic across IP subnets.
# hybrid: noEncap if source and destination Nodes are on the same subnet, otherwise encap.
#
#trafficEncapMode: encap
2 changes: 1 addition & 1 deletion cmd/antrea-agent/options_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (o *Options) checkUnsupportedFeatures() error {
unsupported = append(unsupported, "OVSDatapathType: "+o.config.OVSDatapathType)
}
_, encapMode := config.GetTrafficEncapModeFromStr(o.config.TrafficEncapMode)
if encapMode != config.TrafficEncapModeEncap {
if encapMode == config.TrafficEncapModeNetworkPolicyOnly {
unsupported = append(unsupported, "TrafficEncapMode: "+encapMode.String())
}
if o.config.TunnelType == ovsconfig.GRETunnel {
Expand Down
2 changes: 1 addition & 1 deletion cmd/antrea-agent/options_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestCheckUnsupportedFeatures(t *testing.T) {
{
"noEncap mode",
AgentConfig{TrafficEncapMode: config.TrafficEncapModeNoEncap.String()},
false,
true,
},
{
"GRE tunnel",
Expand Down
35 changes: 29 additions & 6 deletions pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
utilip "antrea.io/antrea/pkg/util/ip"
Expand Down Expand Up @@ -135,6 +136,7 @@ type nodeRouteInfo struct {
podCIDRs []*net.IPNet
nodeIP net.IP
gatewayIP []net.IP
nodeMAC net.HardwareAddr
}

// enqueueNode adds an object to the controller work queue
Expand Down Expand Up @@ -406,8 +408,14 @@ func (c *Controller) deleteNodeRoute(nodeName string) error {
}

func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
if _, installed, _ := c.installedNodes.GetByKey(nodeName); installed {
// Route is already added for this Node.
peerNodeMAC, err := getNodeMAC(node)
if err != nil {
klog.Errorf("Error when retrieving MAC of Node %s: %v", nodeName, err)
}

nrInfo, installed, _ := c.installedNodes.GetByKey(nodeName)
if installed && nrInfo != nil && nrInfo.(*nodeRouteInfo).nodeMAC != nil && nrInfo.(*nodeRouteInfo).nodeMAC.String() == peerNodeMAC.String() {
// Route is already added for this Node and Node MAC isn't changed.
return nil
}

Expand All @@ -431,7 +439,7 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
return nil
}

nodesHaveSamePodCIDR, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, podCIDR)
nodesHaveSamePodCIDR, _ := c.installedNodes.IndexKeys(nodeRouteInfoPodCIDRIndexName, podCIDR)
// PodCIDRs can be released from deleted Nodes and allocated to new Nodes. For server side, it won't happen that a
// PodCIDR is allocated to more than one Node at any point. However, for client side, if a resync happens to occur
// when there are Node creation and deletion events, the informer will generate the events in a way that all
Expand All @@ -441,9 +449,9 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
// stale routes, flows, and relevant cache of this podCIDR are removed appropriately, we wait for the Node deletion
// event to be processed before proceeding, or the route installation and uninstallation operations may override or
// conflict with each other.
if len(nodesHaveSamePodCIDR) > 0 {
if len(nodesHaveSamePodCIDR) > 0 && (len(nodesHaveSamePodCIDR) != 1 || nodesHaveSamePodCIDR[0] != nodeName) {
// Return an error so that the Node will be put back to the workqueue and will be retried later.
return fmt.Errorf("skipping addNodeRoute for Node %s because podCIDR %s is duplicate with Node %s, will retry later", nodeName, podCIDR, nodesHaveSamePodCIDR[0].(*nodeRouteInfo).nodeName)
return fmt.Errorf("skipping addNodeRoute for Node %s because podCIDR %s is duplicate with Node %s, will retry later", nodeName, podCIDR, nodesHaveSamePodCIDR[0])
}

peerPodCIDRAddr, peerPodCIDR, err := net.ParseCIDR(podCIDR)
Expand Down Expand Up @@ -476,7 +484,8 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
nodeName,
peerConfig,
peerNodeIP,
uint32(ipsecTunOFPort))
uint32(ipsecTunOFPort),
peerNodeMAC)
if err != nil {
return fmt.Errorf("failed to install flows to Node %s: %v", nodeName, err)
}
Expand All @@ -493,6 +502,7 @@ func (c *Controller) addNodeRoute(nodeName string, node *corev1.Node) error {
podCIDRs: podCIDRs,
nodeIP: peerNodeIP,
gatewayIP: peerGatewayIPs,
nodeMAC: peerNodeMAC,
})
return err
}
Expand Down Expand Up @@ -653,3 +663,16 @@ func (c *Controller) IPInPodSubnets(ip net.IP) bool {
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
}

// getNodeMAC gets Node's br-int MAC from its annotation. It is for Windows Noencap mode only.
func getNodeMAC(node *corev1.Node) (net.HardwareAddr, error) {
macStr := node.Annotations[types.NodeMACAddressAnnotationKey]
if macStr == "" {
return nil, nil
}
mac, err := net.ParseMAC(macStr)
if err != nil {
return nil, err
}
return mac, nil
}
8 changes: 4 additions & 4 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.processNextWorkItem()

Expand All @@ -150,7 +150,7 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
// After node1 is deleted, routes and flows should be installed for node2 successfully.
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node2", nodeIP2, podCIDRGateway).Times(1)
c.processNextWorkItem()
}()
Expand Down Expand Up @@ -214,12 +214,12 @@ func TestIPInPodSubnets(t *testing.T) {
c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.processNextWorkItem()

c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1)
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0), nil).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1)
c.processNextWorkItem()

Expand Down
39 changes: 36 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/util/runtime"
"antrea.io/antrea/third_party/proxy"
)

Expand Down Expand Up @@ -68,7 +69,8 @@ type Client interface {
hostname string,
peerConfigs map[*net.IPNet]net.IP,
tunnelPeerIP net.IP,
ipsecTunOFPort uint32) error
ipsecTunOFPort uint32,
peerNodeMAC net.HardwareAddr) error

// UninstallNodeFlows removes the connection to the remote Node specified with the
// hostname. UninstallNodeFlows will do nothing if no connection to the host was established.
Expand Down Expand Up @@ -290,7 +292,8 @@ func (c *client) IsConnected() bool {

// addFlows installs the flows on the OVS bridge and then add them into the flow cache. If the flow cache exists,
// it will return immediately, otherwise it will use Bundle to add all flows, and then add them into the flow cache.
// If it fails to add the flows with Bundle, it will return the error and no flow cache is created.
// If it fails to add the flows with Bundle, it will return the error and no flow cache is created. If the force parameter
// is true, flows will be added regardless of flow cache.
func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, return immediately. Otherwise, add the flows to the switch
Expand All @@ -312,6 +315,20 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return nil
}

func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
err := c.ofEntryOperations.ModifyAll(flows)
if err != nil {
return err
}
fCache := flowCache{}
// Modify the flows in the flow cache.
for _, flow := range flows {
fCache[flow.MatchString()] = flow
}
cache.Store(flowCacheKey, fCache)
return nil
}

// deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey.
func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error {
fCacheI, ok := cache.Load(flowCacheKey)
Expand All @@ -335,13 +352,15 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
func (c *client) InstallNodeFlows(hostname string,
peerConfigs map[*net.IPNet]net.IP,
tunnelPeerIP net.IP,
ipsecTunOFPort uint32) error {
ipsecTunOFPort uint32,
remoteGatewayMAC net.HardwareAddr) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

var flows []binding.Flow
localGatewayMAC := c.nodeConfig.GatewayConfig.MAC

modify := false
for peerPodCIDR, peerGatewayIP := range peerConfigs {
if peerGatewayIP.To4() != nil {
// Since broadcast is not supported in IPv6, ARP should happen only with IPv4 address, and ARP responder flows
Expand All @@ -352,6 +371,9 @@ func (c *client) InstallNodeFlows(hostname string,
// tunnelPeerIP is the Node Internal Address. In a dual-stack setup, whether this address is an IPv4 address or an
// IPv6 one is decided by the address family of Node Internal Address.
flows = append(flows, c.l3FwdFlowToRemote(localGatewayMAC, *peerPodCIDR, tunnelPeerIP, cookie.Node))
} else if runtime.IsWindowsPlatform() && !c.encapMode.NeedsRoutingToPeer(tunnelPeerIP, c.nodeConfig.NodeIPAddr) && remoteGatewayMAC != nil {
flows = append(flows, c.l3FwdFlowToRemoteViaRouting(remoteGatewayMAC, *peerPodCIDR, cookie.Node)...)
modify = true
} else {
flows = append(flows, c.l3FwdFlowToRemoteViaGW(localGatewayMAC, *peerPodCIDR, cookie.Node))
}
Expand All @@ -364,6 +386,9 @@ func (c *client) InstallNodeFlows(hostname string,
flows = append(flows, c.tunnelClassifierFlow(ipsecTunOFPort, cookie.Node))
}

if modify {
return c.modifyFlows(c.nodeFlowCache, hostname, flows)
}
return c.addFlows(c.nodeFlowCache, hostname, flows)
}

Expand Down Expand Up @@ -580,6 +605,14 @@ func (c *client) InstallGatewayFlows() error {
if c.encapMode.SupportsEncap() {
flows = append(flows, c.l3FwdFlowToGateway(gatewayIPs, gatewayConfig.MAC, cookie.Default)...)
}
if runtime.IsWindowsPlatform() && c.encapMode.SupportsNoEncap() {
if gatewayConfig.IPv4 != nil {
flows = append(flows, c.l3FwdFlowRouteToWindowsGW(gatewayConfig.MAC, gatewayConfig.IPv4, cookie.Default)...)
}
if gatewayConfig.IPv6 != nil {
flows = append(flows, c.l3FwdFlowRouteToWindowsGW(gatewayConfig.MAC, gatewayConfig.IPv6, cookie.Default)...)
}
}

if err := c.ofEntryOperations.AddAll(flows); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func installNodeFlows(ofClient Client, cacheKey string) (int, error) {
peerConfig := map[*net.IPNet]net.IP{
ipNet: gwIP,
}
err := ofClient.InstallNodeFlows(hostName, peerConfig, peerNodeIP, 0)
err := ofClient.InstallNodeFlows(hostName, peerConfig, peerNodeIP, 0, nil)
client := ofClient.(*client)
fCacheI, ok := client.nodeFlowCache.Load(hostName)
if ok {
Expand Down
39 changes: 38 additions & 1 deletion pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,6 @@ func (c *client) changeOFEntries(ofEntries []binding.OFEntry, action ofAction) e
} else {
return fmt.Errorf("OF Entries Action not exists: %s", action)
}

startTime := time.Now()
defer func() {
d := time.Since(startTime)
Expand Down Expand Up @@ -2165,3 +2164,41 @@ func NewClient(bridgeName, mgmtAddr string, ovsDatapathType ovsconfig.OVSDatapat
c.generatePipeline()
return c
}

// l3FwdFlowRouteToWindowsGW adds a flow table to forward traffic to antrea-gw0 on Windows in NoEncap mode. It is for Windows.
func (c *client) l3FwdFlowRouteToWindowsGW(localGatewayMAC net.HardwareAddr, localGatewayIP net.IP, category cookie.Category) []binding.Flow {
flows := []binding.Flow{
c.pipeline[l3ForwardingTable].BuildFlow(priorityNormal).MatchProtocol(binding.ProtocolIP).
MatchRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
MatchDstIP(localGatewayIP).
Action().SetDstMAC(localGatewayMAC).
Action().GotoTable(l3DecTTLTable).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
}
return flows
}

// l3FwdFlowToRemoteViaRouting enhances Windows Noencap mode performance by bypassing host network.
func (c *client) l3FwdFlowToRemoteViaRouting(
gatewayMAC net.HardwareAddr,
peerSubnet net.IPNet,
category cookie.Category) []binding.Flow {
var flows []binding.Flow
ipProto := getIPProtocol(peerSubnet.IP)
l3FwdTable := c.pipeline[l3ForwardingTable]
flows = append(flows, l3FwdTable.BuildFlow(priorityNormal).MatchProtocol(ipProto).
MatchDstIPNet(peerSubnet).
Action().SetDstMAC(gatewayMAC).
Action().GotoTable(l3FwdTable.GetNext()).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
flows = append(flows, c.pipeline[l2ForwardingCalcTable].BuildFlow(priorityNormal).
MatchDstMAC(gatewayMAC).
Action().LoadRegRange(int(PortCacheReg), config.UplinkOFPort, ofPortRegRange).
Action().LoadRegRange(int(marksReg), macRewriteMark, ofPortMarkRange).
Action().GotoTable(conntrackCommitTable).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())
return flows
}
12 changes: 12 additions & 0 deletions pkg/agent/openflow/pipeline_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,17 @@ func (c *client) hostBridgeUplinkFlows(localSubnet net.IPNet, category cookie.Ca
Cookie(c.cookieAllocator.Request(category).Raw()).
Done(),
}
if c.encapMode.SupportsNoEncap() {
// If NoEncap is enabled, the reply packets from remote Pod can be forwarded to local Pod directly.
// by explicitly resubmitting them to endpointDNATTable and marking "macRewriteMark" at same time.
flows = append(flows, c.pipeline[conntrackStateTable].BuildFlow(priorityHigh).MatchProtocol(binding.ProtocolIP).
MatchRegRange(int(marksReg), markTrafficFromUplink, binding.Range{0, 15}).
MatchDstIPNet(localSubnet).
Action().LoadRegRange(int(marksReg), macRewriteMark, macRewriteMarkRange).
Action().GotoTable(endpointDNATTable).
Cookie(c.cookieAllocator.Request(category).Raw()).
Done())

}
return flows
}
8 changes: 4 additions & 4 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6933039

Please sign in to comment.