From 7ecbdda1b90d957582c8946afe845a963a4110f7 Mon Sep 17 00:00:00 2001 From: wenyingd Date: Thu, 30 Mar 2023 13:33:09 +0800 Subject: [PATCH] address comments Signed-off-by: wenyingd --- pkg/agent/openflow/client.go | 48 ++- pkg/agent/openflow/client_test.go | 90 +++-- pkg/agent/openflow/egress.go | 6 +- .../openflow/externalnode_connectivity.go | 12 +- pkg/agent/openflow/framework.go | 6 +- pkg/agent/openflow/multicast.go | 5 +- pkg/agent/openflow/multicluster.go | 8 +- pkg/agent/openflow/network_policy.go | 120 +++---- pkg/agent/openflow/network_policy_test.go | 336 ++++++------------ pkg/agent/openflow/pipeline.go | 77 ++-- pkg/agent/openflow/pod_connectivity.go | 8 +- pkg/agent/openflow/service.go | 6 +- pkg/agent/openflow/testing/mock_openflow.go | 10 +- pkg/agent/openflow/traceflow.go | 10 +- pkg/ovs/openflow/interfaces.go | 6 +- pkg/ovs/openflow/ofctrl_action.go | 1 - pkg/ovs/openflow/ofctrl_bridge.go | 25 +- pkg/ovs/openflow/ofctrl_builder.go | 124 ------- pkg/ovs/openflow/ofctrl_flow.go | 56 ++- pkg/ovs/openflow/ofctrl_flow_test.go | 5 +- pkg/ovs/openflow/testing/mock_openflow.go | 46 +-- pkg/ovs/openflow/{testing => }/utils.go | 18 +- test/integration/ovs/ofctrl_test.go | 7 +- 23 files changed, 402 insertions(+), 628 deletions(-) rename pkg/ovs/openflow/{testing => }/utils.go (98%) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 2844f086296..a3379f5bee7 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -22,7 +22,6 @@ import ( "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" ofutil "antrea.io/libOpenflow/util" - "antrea.io/ofnet/ofctrl" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -395,7 +394,7 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [ // It will skip flows whose cache already exists. All flows will be installed via a bundle. func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error { // allMessages keeps the OpenFlow modification messages we will install via a bundle. - var allMessages []ofctrl.OpenFlowModMessage + var allMessages []*openflow15.FlowMod // flowCacheMap keeps the flowMessageCache items we will add to the cache on bundle success. flowCacheMap := map[string]flowMessageCache{} for flowCacheKey, flows := range keyToFlows { @@ -407,9 +406,9 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m } fCache := flowMessageCache{} for _, flow := range flows { - cf := getCachedFlow(flow, binding.AddMessage) - allMessages = append(allMessages, cf.message) - fCache[flow.MatchString()] = cf + cf := getFlowModMessage(flow, binding.AddMessage) + allMessages = append(allMessages, cf) + fCache[getFlowKey(cf)] = cf } flowCacheMap[flowCacheKey] = fCache } @@ -433,30 +432,30 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow fCache := flowMessageCache{} var err error if !ok { - messages := make([]ofctrl.OpenFlowModMessage, 0) + messages := make([]*openflow15.FlowMod, 0) for _, flow := range flows { - cf := getCachedFlow(flow, binding.AddMessage) - messages = append(messages, cf.message) - fCache[flow.MatchString()] = cf + cf := getFlowModMessage(flow, binding.AddMessage) + messages = append(messages, cf) + fCache[getFlowKey(cf)] = cf } err = c.ofEntryOperations.AddAll(messages) } else { - var adds, mods, dels []ofctrl.OpenFlowModMessage + var adds, mods, dels []*openflow15.FlowMod oldFlowCache := oldFlowCacheI.(flowMessageCache) for _, flow := range flows { matchString := flow.MatchString() - var cachedFlowInfo *cachedFlow + var cachedFlowInfo *openflow15.FlowMod if _, ok := oldFlowCache[matchString]; ok { - cachedFlowInfo = getCachedFlow(flow, binding.ModifyMessage) + cachedFlowInfo = getFlowModMessage(flow, binding.ModifyMessage) } else { - cachedFlowInfo = getCachedFlow(flow, binding.AddMessage) + cachedFlowInfo = getFlowModMessage(flow, binding.AddMessage) } - adds = append(adds, cachedFlowInfo.message) + adds = append(adds, cachedFlowInfo) fCache[matchString] = cachedFlowInfo } for k, v := range oldFlowCache { if _, ok := fCache[k]; !ok { - dels = append(dels, v.message) + dels = append(dels, v) } } err = c.ofEntryOperations.BundleOps(adds, mods, dels) @@ -479,7 +478,7 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro // It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle. func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error { // allFlows keeps the flows we will delete via a bundle. - var allFlows []ofctrl.OpenFlowModMessage + var allFlows []*openflow15.FlowMod for _, key := range keys { flows, ok := cache.Load(key) // If a flow cache entry of the key does not exist, skip it. @@ -488,7 +487,7 @@ func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []st continue } for _, flow := range flows.(flowMessageCache) { - allFlows = append(allFlows, flow.message) + allFlows = append(allFlows, flow) } } if len(allFlows) == 0 { @@ -658,7 +657,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string) c.replayMutex.RLock() defer c.replayMutex.RUnlock() for _, flow := range fCache { - flowKeys = append(flowKeys, flow.key) + flowKeys = append(flowKeys, getFlowKey(flow)) } return flowKeys } @@ -1532,18 +1531,17 @@ func (c *client) UninstallMulticlusterPodFlows(podIP string) error { return c.deleteFlows(c.featureMulticluster.cachedPodFlows, podIP) } -func getFlowMessages(flows []binding.Flow) []ofctrl.OpenFlowModMessage { - messages := make([]ofctrl.OpenFlowModMessage, 0, len(flows)) +func getFlowMessages(flows []binding.Flow) []*openflow15.FlowMod { + messages := make([]*openflow15.FlowMod, 0, len(flows)) for i := range flows { - cf := getCachedFlow(flows[i], binding.AddMessage) - messages = append(messages, cf.message) + msg := getFlowModMessage(flows[i], binding.AddMessage) + messages = append(messages, msg) } return messages } -func getCachedFlow(flow binding.Flow, entryOper binding.OFOperation) *cachedFlow { +func getFlowModMessage(flow binding.Flow, entryOper binding.OFOperation) *openflow15.FlowMod { bundleMessages, _ := flow.GetBundleMessages(entryOper) - flowKey := flow.MatchString() flowMessage := bundleMessages[0] - return &cachedFlow{key: flowKey, table: flow.GetTable(), isDropFlow: flow.IsDropFlow(), message: flowMessage} + return flowMessage.GetMessage().(*openflow15.FlowMod) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 950f9c05f52..7cf93625e5a 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -458,7 +458,7 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations, client.ipProtocols = ipProtocols client.generatePipelines() client.realizePipelines() - ovsoftest.TableNameCache = getTableNameCache() + binding.TableNameCache = getTableNameCache() client.bridge.(*binding.OFBridge).SetOFSwitch(ofctrl.NewSwitch(&util.MessageStream{}, GlobalVirtualMAC, nil, make(chan int), 32776)) client.bridge.(*binding.OFBridge).Initialize() return client @@ -476,10 +476,9 @@ func getTableNameCache() map[uint8]string { } func getFlowStrings(flows interface{}) []string { - getStrings := func(flowMessage ofctrl.OpenFlowModMessage) []string { + getStrings := func(message *openflow15.FlowMod) []string { var strs []string - message, _ := flowMessage.GetMessage().(*openflow15.FlowMod) - f := ovsoftest.FlowModToString(message) + f := binding.FlowModToString(message) strs = append(strs, f) return strs } @@ -487,16 +486,16 @@ func getFlowStrings(flows interface{}) []string { switch v := flows.(type) { case flowMessageCache: for _, flow := range v { - flowStrings = append(flowStrings, getStrings(flow.message)...) + flowStrings = append(flowStrings, getStrings(flow)...) } case []binding.Flow: for _, flow := range v { messages, _ := flow.GetBundleMessages(binding.AddMessage) for _, msg := range messages { - flowStrings = append(flowStrings, getStrings(msg)...) + flowStrings = append(flowStrings, getStrings(msg.GetMessage().(*openflow15.FlowMod))...) } } - case []ofctrl.OpenFlowModMessage: + case []*openflow15.FlowMod: for _, flow := range v { flowStrings = append(flowStrings, getStrings(flow)...) } @@ -505,10 +504,10 @@ func getFlowStrings(flows interface{}) []string { } func getGroupFromCache(groupCache binding.Group) string { - ovsoftest.TableNameCache = getTableNameCache() + binding.TableNameCache = getTableNameCache() messages, _ := groupCache.GetBundleMessages(binding.AddMessage) - groupString := ovsoftest.GroupModToString(messages[0].GetMessage().(*openflow15.GroupMod)) + groupString := binding.GroupModToString(messages[0].GetMessage().(*openflow15.GroupMod)) return groupString } @@ -838,11 +837,11 @@ func Test_client_GetPodFlowKeys(t *testing.T) { assert.NoError(t, fc.InstallPodFlows(interfaceName, podInterfaceIPs, podMAC, uint32(11), 0, nil)) flowKeys := fc.GetPodFlowKeys(interfaceName) expectedFlowKeys := []string{ - "table=1,arp,in_port=11,arp_sha=00:00:10:10:00:11,arp_spa=10.10.0.11", - "table=3,in_port=11", - "table=4,ip,in_port=11,dl_src=00:00:10:10:00:11,nw_src=10.10.0.11", - "table=17,ip,reg0=0x200/0x200,nw_dst=10.10.0.11", - "table=22,dl_dst=00:00:10:10:00:11", + "table=1,priority=200,arp,in_port=11,arp_spa=10.10.0.11,arp_sha=00:00:10:10:00:11", + "table=3,priority=190,in_port=11", + "table=4,priority=200,ip,in_port=11,dl_src=00:00:10:10:00:11,nw_src=10.10.0.11", + "table=17,priority=200,ip,reg0=0x200/0x200,nw_dst=10.10.0.11", + "table=22,priority=200,dl_dst=00:00:10:10:00:11", } assert.ElementsMatch(t, expectedFlowKeys, flowKeys) } @@ -1225,11 +1224,11 @@ func Test_client_GetServiceFlowKeys(t *testing.T) { assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints)) flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints) expectedFlowKeys := []string{ - "table=11,tcp,tp_dst=0x50,nw_dst=10.96.0.224,reg4=0x10000/0x70000", - "table=11,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=0x50", - "table=12,tcp,reg4=0x20050/0x7ffff,reg3=0xa0a000b", - "table=12,tcp,reg4=0x20050/0x7ffff,reg3=0xa0a000c", - "table=20,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12,ct_state=+new+trk", + "table=11,priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.224,tp_dst=80", + "table=11,priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=80", + "table=12,priority=200,tcp,reg3=0xa0a000b,reg4=0x20050/0x7ffff", + "table=12,priority=200,tcp,reg3=0xa0a000c,reg4=0x20050/0x7ffff", + "table=20,priority=190,ct_state=+new+trk,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12", } assert.ElementsMatch(t, expectedFlowKeys, flowKeys) } @@ -1481,10 +1480,10 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) _, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32") - flowMsg := &ofctrl.FlowBundleMessage{} - mFlow := &cachedFlow{isDropFlow: true, message: flowMsg, table: EgressDefaultTable.ofTable} + flows, _ := EgressDefaultTable.ofTable.BuildFlow(priority100).Action().Drop().Done().GetBundleMessages(binding.AddMessage) + flowMsg := flows[0].GetMessage().(*openflow15.FlowMod) ctx := &conjMatchFlowContext{ - dropFlow: mFlow, + dropFlow: flowMsg, dropFlowEnableLogging: false, conjunctiveMatch: &conjunctiveMatch{ tableID: 1, @@ -1496,8 +1495,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client { }, }} fc.featureNetworkPolicy.globalConjMatchFlowCache["mockContext"] = ctx - EgressMetricTable.ofTable.SetTable() - fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []*cachedFlow{{isDropFlow: true, message: flowMsg, table: EgressMetricTable.ofTable}}}) + fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []*openflow15.FlowMod{flowMsg}}) return fc } @@ -2266,7 +2264,7 @@ func Test_client_ReplayFlows(t *testing.T) { addFlowInCache := func(cache *flowCategoryCache, cacheKey string, flows []binding.Flow) { fCache := flowMessageCache{} for _, flow := range flows { - cf := getCachedFlow(flow, binding.AddMessage) + cf := getFlowModMessage(flow, binding.AddMessage) fCache[flow.MatchString()] = cf } cache.Store(cacheKey, fCache) @@ -2302,14 +2300,14 @@ func Test_client_ReplayFlows(t *testing.T) { priority200 = uint16(200) conj := &policyRuleConjunction{ id: ruleID, - actionFlows: []*cachedFlow{getCachedFlow(fc.featureNetworkPolicy.conjunctionActionDenyFlow(ruleID, IngressRuleTable.ofTable, &priority200, DispositionDrop, false), binding.AddMessage)}, - metricFlows: []*cachedFlow{getCachedFlow(fc.featureNetworkPolicy.denyRuleMetricFlow(ruleID, true, IngressMetricTable.GetID()), binding.AddMessage)}, + actionFlows: []*openflow15.FlowMod{getFlowModMessage(fc.featureNetworkPolicy.conjunctionActionDenyFlow(ruleID, IngressRuleTable.ofTable, &priority200, DispositionDrop, false), binding.AddMessage)}, + metricFlows: []*openflow15.FlowMod{getFlowModMessage(fc.featureNetworkPolicy.denyRuleMetricFlow(ruleID, true, IngressMetricTable.GetID()), binding.AddMessage)}, } assert.NoError(t, fc.featureNetworkPolicy.policyCache.Add(conj)) mp := matchPair{matchKey: MatchDstOFPort, matchValue: int32(podOfPort)} context := &conjMatchFlowContext{ - flow: getCachedFlow(fc.featureNetworkPolicy.conjunctiveMatchFlow(IngressRuleTable.GetID(), []matchPair{mp}, &priority200, []*conjunctiveAction{{conjID: ruleID, clauseID: 2, nClause: 2}}), binding.AddMessage), - dropFlow: getCachedFlow(fc.featureNetworkPolicy.defaultDropFlow(IngressDefaultTable.ofTable, []matchPair{mp}, true), binding.AddMessage), + flow: getFlowModMessage(fc.featureNetworkPolicy.conjunctiveMatchFlow(IngressRuleTable.GetID(), []matchPair{mp}, &priority200, []*conjunctiveAction{{conjID: ruleID, clauseID: 2, nClause: 2}}), binding.AddMessage), + dropFlow: getFlowModMessage(fc.featureNetworkPolicy.defaultDropFlow(IngressDefaultTable.ofTable, []matchPair{mp}, true), binding.AddMessage), } fc.featureNetworkPolicy.globalConjMatchFlowCache["npMatch"] = context replayedFlows = append(replayedFlows, @@ -2346,7 +2344,7 @@ func Test_client_ReplayFlows(t *testing.T) { expectedFlows = append(expectedFlows, replayedFlows...) actualFlows := make([]string, 0) - m.EXPECT().AddAll(gomock.Any()).Do(func(flowMessages []ofctrl.OpenFlowModMessage) { + m.EXPECT().AddAll(gomock.Any()).Do(func(flowMessages []*openflow15.FlowMod) { flowStrings := getFlowStrings(flowMessages) actualFlows = append(actualFlows, flowStrings...) }).Return(nil).AnyTimes() @@ -2354,3 +2352,35 @@ func Test_client_ReplayFlows(t *testing.T) { fc.ReplayFlows() assert.ElementsMatch(t, expectedFlows, actualFlows) } + +func TestCachedFlowIsDrop(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + m := oftest.NewMockOFEntryOperations(ctrl) + fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap) + defer resetPipelines() + fc.bridge = ovsoftest.NewMockBridge(ctrl) + + _, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32") + flows, err := EgressDefaultTable.ofTable. + BuildFlow(priority100). + MatchDstIPNet(*ipCIDR). + Action().Drop(). + Done(). + GetBundleMessages(binding.AddMessage) + assert.NoError(t, err) + require.Equal(t, 1, len(flows)) + msg := flows[0].GetMessage().(*openflow15.FlowMod) + assert.True(t, isDropFlow(msg)) + + flows, err = EgressDefaultTable.ofTable. + BuildFlow(priority100). + MatchDstIPNet(*ipCIDR). + Action().GotoTable(1). + Done(). + GetBundleMessages(binding.AddMessage) + assert.NoError(t, err) + require.Equal(t, 1, len(flows)) + msg = flows[0].GetMessage().(*openflow15.FlowMod) + assert.False(t, isDropFlow(msg)) +} diff --git a/pkg/agent/openflow/egress.go b/pkg/agent/openflow/egress.go index acc51c25fa2..9666377dc0a 100644 --- a/pkg/agent/openflow/egress.go +++ b/pkg/agent/openflow/egress.go @@ -17,7 +17,7 @@ package openflow import ( "net" - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -73,12 +73,12 @@ func newFeatureEgress(cookieAllocator cookie.Allocator, } } -func (f *featureEgress) initFlows() []ofctrl.OpenFlowModMessage { +func (f *featureEgress) initFlows() []*openflow15.FlowMod { // This installs the flows to enable Pods to communicate to the external IP addresses. The flows identify the packets // from local Pods to the external IP address, and mark the packets to be SNAT'd with the configured SNAT IPs. return getFlowMessages(f.externalFlows()) } -func (f *featureEgress) replayFlows() []ofctrl.OpenFlowModMessage { +func (f *featureEgress) replayFlows() []*openflow15.FlowMod { return getCachedFlowMessages(f.cachedFlows) } diff --git a/pkg/agent/openflow/externalnode_connectivity.go b/pkg/agent/openflow/externalnode_connectivity.go index d06bae69846..5889324946d 100644 --- a/pkg/agent/openflow/externalnode_connectivity.go +++ b/pkg/agent/openflow/externalnode_connectivity.go @@ -17,7 +17,7 @@ package openflow import ( "net" - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" "antrea.io/antrea/pkg/agent/openflow/cookie" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -101,7 +101,7 @@ func (f *featureExternalNodeConnectivity) vmUplinkFlows(hostOFPort, uplinkOFPort } } -func (f *featureExternalNodeConnectivity) initFlows() []ofctrl.OpenFlowModMessage { +func (f *featureExternalNodeConnectivity) initFlows() []*openflow15.FlowMod { cookieID := f.cookieAllocator.Request(f.category).Raw() flows := []binding.Flow{ L2ForwardingOutTable.ofTable.BuildFlow(priorityNormal). @@ -140,12 +140,12 @@ func (f *featureExternalNodeConnectivity) initFlows() []ofctrl.OpenFlowModMessag return getFlowMessages(flows) } -func (f *featureExternalNodeConnectivity) replayFlows() []ofctrl.OpenFlowModMessage { - var flows []ofctrl.OpenFlowModMessage +func (f *featureExternalNodeConnectivity) replayFlows() []*openflow15.FlowMod { + var flows []*openflow15.FlowMod rangeFunc := func(key, value interface{}) bool { - cachedFlows := value.([]cachedFlow) + cachedFlows := value.([]*openflow15.FlowMod) for _, flow := range cachedFlows { - flows = append(flows, flow.message) + flows = append(flows, flow) } return true } diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index c09d4f8c833..03b25925266 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -15,7 +15,7 @@ package openflow import ( - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" "antrea.io/antrea/pkg/agent/config" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -58,9 +58,9 @@ type feature interface { // getRequiredTables returns a slice of required tables of the feature. getRequiredTables() []*Table // initFlows returns the Openflow messages of initial flows of the feature. - initFlows() []ofctrl.OpenFlowModMessage + initFlows() []*openflow15.FlowMod // replayFlows returns the Openflow messages of fixed and cached flows that need to be replayed after OVS is reconnected. - replayFlows() []ofctrl.OpenFlowModMessage + replayFlows() []*openflow15.FlowMod } const ( diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index f4aac62e05f..740c6411ee7 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -19,7 +19,6 @@ import ( "sync" "antrea.io/libOpenflow/openflow15" - "antrea.io/ofnet/ofctrl" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -72,7 +71,7 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b Done() } -func (f *featureMulticast) initFlows() []ofctrl.OpenFlowModMessage { +func (f *featureMulticast) initFlows() []*openflow15.FlowMod { // Install flows to send the IGMP report messages to Antrea Agent. flows := f.igmpPktInFlows(uint8(PacketInReasonMC)) // Install flow to forward the IGMP query messages to all local Pods. @@ -88,7 +87,7 @@ func (f *featureMulticast) initFlows() []ofctrl.OpenFlowModMessage { return getFlowMessages(flows) } -func (f *featureMulticast) replayFlows() []ofctrl.OpenFlowModMessage { +func (f *featureMulticast) replayFlows() []*openflow15.FlowMod { // Get cached flows. return getCachedFlowMessages(f.cachedFlows) } diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index 6c69ed6b366..60e573f8fbf 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -17,7 +17,7 @@ package openflow import ( "net" - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" "antrea.io/antrea/pkg/agent/openflow/cookie" binding "antrea.io/antrea/pkg/ovs/openflow" @@ -62,11 +62,11 @@ func newFeatureMulticluster(cookieAllocator cookie.Allocator, ipProtocols []bind } } -func (f *featureMulticluster) initFlows() []ofctrl.OpenFlowModMessage { - return []ofctrl.OpenFlowModMessage{} +func (f *featureMulticluster) initFlows() []*openflow15.FlowMod { + return []*openflow15.FlowMod{} } -func (f *featureMulticluster) replayFlows() []ofctrl.OpenFlowModMessage { +func (f *featureMulticluster) replayFlows() []*openflow15.FlowMod { return getCachedFlowMessages(f.cachedFlows) } diff --git a/pkg/agent/openflow/network_policy.go b/pkg/agent/openflow/network_policy.go index e7e009eeb81..4c7396ee59a 100644 --- a/pkg/agent/openflow/network_policy.go +++ b/pkg/agent/openflow/network_policy.go @@ -22,7 +22,6 @@ import ( "sync" "antrea.io/libOpenflow/openflow15" - "antrea.io/ofnet/ofctrl" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -413,7 +412,7 @@ const ( // DENY-ALL rule is required by a policyRuleConjunction, the flowChange will update the in-memory cache, but will not // change on OVS. type flowChange struct { - flow *cachedFlow + flow *openflow15.FlowMod changeType changeType } @@ -447,12 +446,12 @@ type conjMatchFlowContext struct { denyAllRules map[uint32]bool featureNetworkPolicy *featureNetworkPolicy // flow is the conjunctive match flow built from this context. flow needs to be updated if actions are changed. - flow *cachedFlow + flow *openflow15.FlowMod // dropflow is the default drop flow built from this context to drop packets in the AppliedToGroup but not pass the // NetworkPolicy rule. dropFlow is installed on the switch as long as either actions or denyAllRules is not // empty, and uninstalled when both two are empty. When the dropFlow is uninstalled from the switch, the // conjMatchFlowContext is removed from the cache. - dropFlow *cachedFlow + dropFlow *openflow15.FlowMod // dropFlowEnableLogging describes the logging requirement of the dropFlow. dropFlowEnableLogging bool } @@ -471,7 +470,7 @@ func (ctx *conjMatchFlowContext) createOrUpdateConjunctiveMatchFlow(actions []*c // Create the conjunctive match flow entry. The actions here should not be empty for either add or update case. // The expected operation for a new Openflow entry should be "insertion". flow := ctx.featureNetworkPolicy.conjunctiveMatchFlow(ctx.tableID, ctx.matchPairs, ctx.priority, actions) - cf := getCachedFlow(flow, binding.AddMessage) + cf := getFlowModMessage(flow, binding.AddMessage) return &flowChange{ flow: cf, changeType: insertion, @@ -480,7 +479,7 @@ func (ctx *conjMatchFlowContext) createOrUpdateConjunctiveMatchFlow(actions []*c // Modify the existing Openflow entry and reset the actions. flow := ctx.featureNetworkPolicy.conjunctiveMatchFlow(ctx.tableID, ctx.matchPairs, ctx.priority, actions) - cf := getCachedFlow(flow, binding.AddMessage) + cf := getFlowModMessage(flow, binding.AddMessage) // The expected operation for an existing Openflow entry should be "modification". return &flowChange{ flow: cf, @@ -663,8 +662,8 @@ type policyRuleConjunction struct { fromClause *clause toClause *clause serviceClause *clause - actionFlows []*cachedFlow - metricFlows []*cachedFlow + actionFlows []*openflow15.FlowMod + metricFlows []*openflow15.FlowMod // NetworkPolicy reference information for debugging usage, its value can be nil // for conjunctions that are not built for a specific NetworkPolicy, e.g. DNS packetin Conjunction. npRef *v1beta2.NetworkPolicyReference @@ -696,23 +695,21 @@ func (c *client) NewDNSPacketInConjunction(id uint32) error { klog.InfoS("DNS Conjunction has already been added to cache", "id", id) return nil } - var cachedFlows []*cachedFlow - var flowMessages []ofctrl.OpenFlowModMessage + var flowMsgs []*openflow15.FlowMod for _, flow := range []binding.Flow{ c.featureNetworkPolicy.dnsPacketInFlow(id), c.featureNetworkPolicy.dnsResponseBypassPacketInFlow(), c.featureNetworkPolicy.dnsResponseBypassConntrackFlow(c.pipelines[pipelineIP].GetFirstTableInStage(stageConntrackState)), } { - cf := getCachedFlow(flow, binding.AddMessage) - cachedFlows = append(cachedFlows, cf) - flowMessages = append(flowMessages, cf.message) + cf := getFlowModMessage(flow, binding.AddMessage) + flowMsgs = append(flowMsgs, cf) } conj := &policyRuleConjunction{ id: id, ruleTableID: AntreaPolicyIngressRuleTable.ofTable.GetID(), - actionFlows: cachedFlows, + actionFlows: flowMsgs, } - if err := c.AddAll(flowMessages); err != nil { + if err := c.AddAll(flowMsgs); err != nil { return fmt.Errorf("error when adding action flows for the DNS conjunction: %w", err) } udpService := v1beta2.Service{ @@ -797,12 +794,12 @@ func (c *clause) addConjunctiveMatchFlow(featureNetworkPolicy *featureNetworkPol if c.dropTable != nil && context.dropFlow == nil { if isMCNPRule { dropFlow = &flowChange{ - flow: getCachedFlow(context.featureNetworkPolicy.multiClusterNetworkPolicySecurityDropFlow(c.dropTable, match.matchPairs), binding.AddMessage), + flow: getFlowModMessage(context.featureNetworkPolicy.multiClusterNetworkPolicySecurityDropFlow(c.dropTable, match.matchPairs), binding.AddMessage), changeType: insertion, } } else { dropFlow = &flowChange{ - flow: getCachedFlow(context.featureNetworkPolicy.defaultDropFlow(c.dropTable, match.matchPairs, enableLogging), binding.AddMessage), + flow: getFlowModMessage(context.featureNetworkPolicy.defaultDropFlow(c.dropTable, match.matchPairs, enableLogging), binding.AddMessage), changeType: insertion, } } @@ -812,7 +809,7 @@ func (c *clause) addConjunctiveMatchFlow(featureNetworkPolicy *featureNetworkPol context.dropFlowEnableLogging = enableLogging if c.dropTable != nil && context.dropFlow != nil { dropFlow = &flowChange{ - flow: getCachedFlow(context.featureNetworkPolicy.defaultDropFlow(c.dropTable, match.matchPairs, enableLogging), binding.AddMessage), + flow: getFlowModMessage(context.featureNetworkPolicy.defaultDropFlow(c.dropTable, match.matchPairs, enableLogging), binding.AddMessage), changeType: modification, } } @@ -1157,9 +1154,9 @@ func (c *client) InstallPolicyRuleFlows(rule *types.PolicyRule) error { defer c.featureNetworkPolicy.conjMatchFlowLock.Unlock() ctxChanges := c.featureNetworkPolicy.calculateMatchFlowChangesForRule(conj, rule) - var flowMessages []ofctrl.OpenFlowModMessage - for _, cf := range append(conj.metricFlows, conj.actionFlows...) { - flowMessages = append(flowMessages, cf.message) + var flowMessages []*openflow15.FlowMod + for _, fm := range append(conj.metricFlows, conj.actionFlows...) { + flowMessages = append(flowMessages, fm) } if err := c.AddAll(flowMessages); err != nil { return err @@ -1211,10 +1208,10 @@ func (f *featureNetworkPolicy) calculateActionFlowChangesForRule(rule *types.Pol actionFlows = append(actionFlows, f.conjunctionActionFlow(ruleOfID, ruleTable, dropTable.GetNext(), rule.Priority, rule.EnableLogging, rule.L7RuleVlanID)...) } for _, flow := range actionFlows { - conj.actionFlows = append(conj.actionFlows, getCachedFlow(flow, binding.AddMessage)) + conj.actionFlows = append(conj.actionFlows, getFlowModMessage(flow, binding.AddMessage)) } for _, flow := range metricFlows { - conj.metricFlows = append(conj.metricFlows, getCachedFlow(flow, binding.AddMessage)) + conj.metricFlows = append(conj.metricFlows, getFlowModMessage(flow, binding.AddMessage)) } } return conj @@ -1278,9 +1275,9 @@ func (f *featureNetworkPolicy) addActionToConjunctiveMatch(clause *clause, match // Generate the default drop flow if dropTable is not nil. if clause.dropTable != nil { if isMCNPRule { - context.dropFlow = getCachedFlow(context.featureNetworkPolicy.multiClusterNetworkPolicySecurityDropFlow(clause.dropTable, match.matchPairs), binding.AddMessage) + context.dropFlow = getFlowModMessage(context.featureNetworkPolicy.multiClusterNetworkPolicySecurityDropFlow(clause.dropTable, match.matchPairs), binding.AddMessage) } else { - context.dropFlow = getCachedFlow(context.featureNetworkPolicy.defaultDropFlow(clause.dropTable, match.matchPairs, enableLogging), binding.AddMessage) + context.dropFlow = getFlowModMessage(context.featureNetworkPolicy.defaultDropFlow(clause.dropTable, match.matchPairs, enableLogging), binding.AddMessage) } } f.globalConjMatchFlowCache[matcherKey] = context @@ -1304,14 +1301,14 @@ func (c *client) BatchInstallPolicyRuleFlows(ofPolicyRules []*types.PolicyRule) c.replayMutex.RLock() defer c.replayMutex.RUnlock() - var allFlowMessages []ofctrl.OpenFlowModMessage + var allFlowMessages []*openflow15.FlowMod var conjunctions []*policyRuleConjunction for _, rule := range ofPolicyRules { conj := c.featureNetworkPolicy.calculateActionFlowChangesForRule(rule) c.featureNetworkPolicy.addRuleToConjunctiveMatch(conj, rule) - for _, cf := range append(conj.actionFlows, conj.metricFlows...) { - allFlowMessages = append(allFlowMessages, cf.message) + for _, msg := range append(conj.actionFlows, conj.metricFlows...) { + allFlowMessages = append(allFlowMessages, msg) } conjunctions = append(conjunctions, conj) } @@ -1325,11 +1322,11 @@ func (c *client) BatchInstallPolicyRuleFlows(ofPolicyRules []*types.PolicyRule) for _, action := range ctx.actions { actions = append(actions, action) } - ctx.flow = getCachedFlow(c.featureNetworkPolicy.conjunctiveMatchFlow(ctx.tableID, ctx.matchPairs, ctx.priority, actions), binding.AddMessage) - allFlowMessages = append(allFlowMessages, ctx.flow.message) + ctx.flow = getFlowModMessage(c.featureNetworkPolicy.conjunctiveMatchFlow(ctx.tableID, ctx.matchPairs, ctx.priority, actions), binding.AddMessage) + allFlowMessages = append(allFlowMessages, ctx.flow) } if ctx.dropFlow != nil { - allFlowMessages = append(allFlowMessages, ctx.dropFlow.message) + allFlowMessages = append(allFlowMessages, ctx.dropFlow) } } @@ -1363,7 +1360,7 @@ func (f *featureNetworkPolicy) applyConjunctiveMatchFlows(flowChanges []*conjMat // sendConjunctiveFlows sends all the changed OpenFlow entries to the OVS bridge in a single Bundle. func (f *featureNetworkPolicy) sendConjunctiveFlows(changes []*conjMatchFlowContextChange) error { - var addFlows, modifyFlows, deleteFlows []ofctrl.OpenFlowModMessage + var addFlows, modifyFlows, deleteFlows []*openflow15.FlowMod var flowChanges []*flowChange for _, change := range changes { if change.matchFlow != nil && change.matchFlow.flow != nil { @@ -1375,7 +1372,7 @@ func (f *featureNetworkPolicy) sendConjunctiveFlows(changes []*conjMatchFlowCont } // Retrieve the OpenFlow entries from the flowChanges. for _, fc := range flowChanges { - flowInfo := fc.flow.message + flowInfo := fc.flow switch fc.changeType { case insertion: addFlows = append(addFlows, flowInfo) @@ -1392,7 +1389,7 @@ func (f *featureNetworkPolicy) sendConjunctiveFlows(changes []*conjMatchFlowCont func (c *policyRuleConjunction) ActionFlowPriorities() []string { priorities := make([]string, 0, len(c.actionFlows)) for _, flow := range c.actionFlows { - priorityStr := strconv.Itoa(int(flow.flowPriority())) + priorityStr := strconv.Itoa(int(flow.Priority)) priorities = append(priorities, priorityStr) } return priorities @@ -1514,7 +1511,7 @@ func (c *policyRuleConjunction) getAllFlowKeys() []string { flowKeys := []string{} dropFlowKeys := []string{} for _, flow := range c.actionFlows { - flowKeys = append(flowKeys, flow.key) + flowKeys = append(flowKeys, getFlowKey(flow)) } addClauseFlowKeys := func(clause *clause) { @@ -1523,10 +1520,10 @@ func (c *policyRuleConjunction) getAllFlowKeys() []string { } for _, ctx := range clause.matches { if ctx.flow != nil { - flowKeys = append(flowKeys, ctx.flow.key) + flowKeys = append(flowKeys, getFlowKey(ctx.flow)) } if ctx.dropFlow != nil { - dropFlowKeys = append(dropFlowKeys, ctx.dropFlow.key) + dropFlowKeys = append(dropFlowKeys, getFlowKey(ctx.dropFlow)) } } } @@ -1572,11 +1569,11 @@ func (c *client) UninstallPolicyRuleFlows(ruleID uint32) ([]string, error) { } staleOFPriorities := c.featureNetworkPolicy.getStalePriorities(conj) // Delete action flows from the OVS bridge. - var flowMessages []ofctrl.OpenFlowModMessage - for _, flow := range append(conj.actionFlows, conj.metricFlows...) { - flowMessages = append(flowMessages, flow.message) + var flowMessages []*openflow15.FlowMod + for _, msg := range append(conj.actionFlows, conj.metricFlows...) { + flowMessages = append(flowMessages, msg) } - if err := c.changeAll(map[ofAction][]ofctrl.OpenFlowModMessage{del: flowMessages}); err != nil { + if err := c.changeAll(map[ofAction][]*openflow15.FlowMod{del: flowMessages}); err != nil { return nil, err } c.featureNetworkPolicy.conjMatchFlowLock.Lock() @@ -1621,16 +1618,16 @@ func (f *featureNetworkPolicy) getStalePriorities(conj *policyRuleConjunction) ( return staleOFPriorities } -func (f *featureNetworkPolicy) replayFlows() []ofctrl.OpenFlowModMessage { - var flows []ofctrl.OpenFlowModMessage +func (f *featureNetworkPolicy) replayFlows() []*openflow15.FlowMod { + var flows []*openflow15.FlowMod addActionFlows := func(conj *policyRuleConjunction) { for _, flow := range conj.actionFlows { - flows = append(flows, flow.message) + flows = append(flows, flow) } } addMetricFlows := func(conj *policyRuleConjunction) { for _, flow := range conj.metricFlows { - flows = append(flows, flow.message) + flows = append(flows, flow) } } @@ -1641,10 +1638,10 @@ func (f *featureNetworkPolicy) replayFlows() []ofctrl.OpenFlowModMessage { addMatchFlows := func(ctx *conjMatchFlowContext) { if ctx.dropFlow != nil { - flows = append(flows, ctx.dropFlow.message) + flows = append(flows, ctx.dropFlow) } if ctx.flow != nil { - flows = append(flows, ctx.flow.message) + flows = append(flows, ctx.flow) } } @@ -1735,13 +1732,13 @@ func (c *client) GetNetworkPolicyFlowKeys(npName, npNamespace string) []string { // flowUpdates stores updates to the actionFlows and matchFlows in a policyRuleConjunction. type flowUpdates struct { - newActionFlows []*cachedFlow + newActionFlows []*openflow15.FlowMod newPriority uint16 } // getMatchFlowUpdates calculates the update for conjuctiveMatchFlows in a policyRuleConjunction to be // installed on a new priority. -func getMatchFlowUpdates(conj *policyRuleConjunction, newPriority uint16) (add, del []*cachedFlow) { +func getMatchFlowUpdates(conj *policyRuleConjunction, newPriority uint16) (add, del []*openflow15.FlowMod) { allClause := []*clause{conj.fromClause, conj.toClause, conj.serviceClause} for _, c := range allClause { if c == nil { @@ -1749,7 +1746,7 @@ func getMatchFlowUpdates(conj *policyRuleConjunction, newPriority uint16) (add, } for _, ctx := range c.matches { f := ctx.flow - updatedFlow := f.copyFlowByNewPriority(newPriority) + updatedFlow := copyFlowByNewPriority(f, newPriority) add = append(add, updatedFlow) del = append(del, f) } @@ -1767,14 +1764,14 @@ func getMatchFlowUpdates(conj *policyRuleConjunction, newPriority uint16) (add, // would essentially void the add flow for conj=1. // // In this case, we remove the conflicting delFlow and set addFlow as a modifyFlow. -func (f *featureNetworkPolicy) processFlowUpdates(addFlows, delFlows []*cachedFlow) (add, update, del []ofctrl.OpenFlowModMessage) { +func (f *featureNetworkPolicy) processFlowUpdates(addFlows, delFlows []*openflow15.FlowMod) (add, update, del []*openflow15.FlowMod) { for _, a := range addFlows { matched := false for i := 0; i < len(delFlows); i++ { - if a.matched(delFlows[i]) { + if flowMessageMatched(a, delFlows[i]) { matched = true // treat the addFlow as update - update = append(update, a.message) + update = append(update, a) // remove the delFlow from the list delFlows = append(delFlows[:i], delFlows[i+1:]...) // reset list index as delFlows[i] is removed @@ -1782,11 +1779,11 @@ func (f *featureNetworkPolicy) processFlowUpdates(addFlows, delFlows []*cachedFl } } if !matched { - add = append(add, a.message) + add = append(add, a) } } for _, flow := range delFlows { - del = append(del, flow.message) + del = append(del, flow) } return add, update, del } @@ -1794,7 +1791,7 @@ func (f *featureNetworkPolicy) processFlowUpdates(addFlows, delFlows []*cachedFl // updateConjunctionActionFlows constructs a new policyRuleConjunction with actionFlows updated to be // stored in the policyCache. func (f *featureNetworkPolicy) updateConjunctionActionFlows(conj *policyRuleConjunction, updates flowUpdates) *policyRuleConjunction { - newActionFlows := make([]*cachedFlow, len(conj.actionFlows)) + newActionFlows := make([]*openflow15.FlowMod, len(conj.actionFlows)) copy(newActionFlows, updates.newActionFlows) newConj := &policyRuleConjunction{ id: conj.id, @@ -1818,8 +1815,7 @@ func (f *featureNetworkPolicy) updateConjunctionMatchFlows(conj *policyRuleConju } for i, ctx := range cl.matches { delete(f.globalConjMatchFlowCache, ctx.generateGlobalMapKey()) - f := ctx.flow - updatedFlow := f.copyFlowByNewPriority(newPriority) + updatedFlow := copyFlowByNewPriority(ctx.flow, newPriority) cl.matches[i].flow = updatedFlow cl.matches[i].priority = &newPriority } @@ -1831,7 +1827,7 @@ func (f *featureNetworkPolicy) updateConjunctionMatchFlows(conj *policyRuleConju } // calculateFlowUpdates calculates the flow updates required for the priority re-assignments specified in the input map. -func (f *featureNetworkPolicy) calculateFlowUpdates(updates map[uint16]uint16, table uint8) (addFlows, delFlows []*cachedFlow, +func (f *featureNetworkPolicy) calculateFlowUpdates(updates map[uint16]uint16, table uint8) (addFlows, delFlows []*openflow15.FlowMod, conjFlowUpdates map[uint32]flowUpdates) { conjFlowUpdates = map[uint32]flowUpdates{} for original, newPriority := range updates { @@ -1845,11 +1841,11 @@ func (f *featureNetworkPolicy) calculateFlowUpdates(updates map[uint16]uint16, t continue } for _, actionFlow := range conj.actionFlows { - flowPriority := actionFlow.flowPriority() + flowPriority := actionFlow.Priority if flowPriority == original { // The OF flow was created at the priority which need to be re-installed // at the NewPriority now - updatedFlow := actionFlow.copyFlowByNewPriority(newPriority) + updatedFlow := copyFlowByNewPriority(actionFlow, newPriority) addFlows = append(addFlows, updatedFlow) delFlows = append(delFlows, actionFlow) // Store the actionFlow update to the policyRuleConjunction and update all @@ -2132,7 +2128,7 @@ func newFeatureNetworkPolicy( } } -func (f *featureNetworkPolicy) initFlows() []ofctrl.OpenFlowModMessage { +func (f *featureNetworkPolicy) initFlows() []*openflow15.FlowMod { f.egressTables = map[uint8]struct{}{EgressRuleTable.GetID(): {}, EgressDefaultTable.GetID(): {}} if f.enableAntreaPolicy { f.egressTables[AntreaPolicyEgressRuleTable.GetID()] = struct{}{} diff --git a/pkg/agent/openflow/network_policy_test.go b/pkg/agent/openflow/network_policy_test.go index e589629c641..0766a0c9259 100644 --- a/pkg/agent/openflow/network_policy_test.go +++ b/pkg/agent/openflow/network_policy_test.go @@ -18,16 +18,19 @@ import ( "errors" "fmt" "net" + "sort" "strconv" "strings" "testing" + "antrea.io/libOpenflow/openflow15" "antrea.io/ofnet/ofctrl" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "antrea.io/antrea/pkg/agent/config" @@ -39,7 +42,6 @@ import ( binding "antrea.io/antrea/pkg/ovs/openflow" mocks "antrea.io/antrea/pkg/ovs/openflow/testing" ovsctltest "antrea.io/antrea/pkg/ovs/ovsctl/testing" - "antrea.io/antrea/pkg/util/ip" ) var ( @@ -310,12 +312,10 @@ func TestInstallPolicyRuleFlows(t *testing.T) { } func TestBatchInstallPolicyRuleFlows(t *testing.T) { - preparePipelines() - defer resetPipelines() - tests := []struct { - name string - rules []*types.PolicyRule - expectedFlowsFn func(c *client) []binding.Flow + for _, tt := range []struct { + name string + rules []*types.PolicyRule + expectedFlows []string }{ { name: "multiple K8s NetworkPolicy rules", @@ -325,7 +325,6 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { From: parseAddresses([]string{"192.168.1.40", "192.168.1.50"}), To: parseAddresses([]string{"0.0.0.0/0"}), FlowID: uint32(10), - TableID: EgressRuleTable.GetID(), PolicyRef: &v1beta2.NetworkPolicyReference{ Type: v1beta2.K8sNetworkPolicy, Namespace: "ns1", @@ -341,7 +340,6 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { To: parseAddresses([]string{"0.0.0.0/0"}), Service: []v1beta2.Service{{Protocol: &protocolTCP, Port: &port8080}}, FlowID: uint32(11), - TableID: EgressRuleTable.GetID(), PolicyRef: &v1beta2.NetworkPolicyReference{ Type: v1beta2.K8sNetworkPolicy, Namespace: "ns1", @@ -350,56 +348,21 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { }, }, }, - expectedFlowsFn: func(c *client) []binding.Flow { - cookiePolicy := c.cookieAllocator.Request(cookie.NetworkPolicy).Raw() - return []binding.Flow{ - EgressRuleTable.ofTable.BuildFlow(priorityLow).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchConjID(10). - Action().LoadToRegField(TFEgressConjIDField, 10). - Action().CT(true, EgressMetricTable.GetID(), CtZone, nil).LoadToLabelField(10, EgressRuleCTLabel).CTDone().Done(), - EgressRuleTable.ofTable.BuildFlow(priorityLow).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchConjID(11). - Action().LoadToRegField(TFEgressConjIDField, 11). - Action().CT(true, EgressMetricTable.GetID(), CtZone, nil).LoadToLabelField(11, EgressRuleCTLabel).CTDone().Done(), - EgressRuleTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.40")). - Action().Conjunction(10, 1, 2). - Action().Conjunction(11, 1, 3).Done(), - EgressRuleTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.50")). - Action().Conjunction(10, 1, 2).Done(), - EgressRuleTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.51")). - Action().Conjunction(11, 1, 3).Done(), - EgressRuleTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchDstIPNet(*ip.MustParseCIDR("0.0.0.0/0")). - Action().Conjunction(10, 2, 2). - Action().Conjunction(11, 2, 3).Done(), - EgressRuleTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolTCP).MatchDstPort(8080, nil). - Action().Conjunction(11, 3, 3).Done(), - EgressDefaultTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.40")). - Action().Drop().Done(), - EgressDefaultTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.50")). - Action().Drop().Done(), - EgressDefaultTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.51")). - Action().Drop().Done(), - EgressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchCTStateNew(true).MatchCTLabelField(0, uint64(10)<<32, EgressRuleCTLabel). - Action().NextTable().Done(), - EgressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchCTStateNew(false).MatchCTLabelField(0, uint64(10)<<32, EgressRuleCTLabel). - Action().NextTable().Done(), - EgressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchCTStateNew(true).MatchCTLabelField(0, uint64(11)<<32, EgressRuleCTLabel). - Action().NextTable().Done(), - EgressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchCTStateNew(false).MatchCTLabelField(0, uint64(11)<<32, EgressRuleCTLabel). - Action().NextTable().Done(), - } + expectedFlows: []string{ + "cookie=0x1020000000000, table=EgressRule, priority=190,conj_id=10,ip actions=set_field:0xa->reg5,ct(commit,table=EgressMetric,zone=65520,exec(set_field:0xa00000000/0xffffffff00000000->ct_label))", + "cookie=0x1020000000000, table=EgressRule, priority=190,conj_id=11,ip actions=set_field:0xb->reg5,ct(commit,table=EgressMetric,zone=65520,exec(set_field:0xb00000000/0xffffffff00000000->ct_label))", + "cookie=0x1020000000000, table=EgressRule, priority=200,ip,nw_src=192.168.1.40 actions=conjunction(10,1/2),conjunction(11,1/3)", + "cookie=0x1020000000000, table=EgressRule, priority=200,ip,nw_src=192.168.1.50 actions=conjunction(10,1/2)", + "cookie=0x1020000000000, table=EgressRule, priority=200,ip,nw_src=192.168.1.51 actions=conjunction(11,1/3)", + "cookie=0x1020000000000, table=EgressRule, priority=200,ip,nw_dst=0.0.0.0/0 actions=conjunction(10,2/2),conjunction(11,2/3)", + "cookie=0x1020000000000, table=EgressRule, priority=200,tcp,tp_dst=8080 actions=conjunction(11,3/3)", + "cookie=0x1020000000000, table=EgressDefaultRule, priority=200,ip,nw_src=192.168.1.40 actions=drop", + "cookie=0x1020000000000, table=EgressDefaultRule, priority=200,ip,nw_src=192.168.1.50 actions=drop", + "cookie=0x1020000000000, table=EgressDefaultRule, priority=200,ip,nw_src=192.168.1.51 actions=drop", + "cookie=0x1020000000000, table=EgressMetric, priority=200,ct_state=+new,ct_label=0xa00000000/0xffffffff00000000,ip actions=goto_table:L3Forwarding", + "cookie=0x1020000000000, table=EgressMetric, priority=200,ct_state=-new,ct_label=0xa00000000/0xffffffff00000000,ip actions=goto_table:L3Forwarding", + "cookie=0x1020000000000, table=EgressMetric, priority=200,ct_state=+new,ct_label=0xb00000000/0xffffffff00000000,ip actions=goto_table:L3Forwarding", + "cookie=0x1020000000000, table=EgressMetric, priority=200,ct_state=-new,ct_label=0xb00000000/0xffffffff00000000,ip actions=goto_table:L3Forwarding", }, }, { @@ -412,7 +375,6 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { Priority: &priority100, To: []types.Address{NewOFPortAddress(1), NewOFPortAddress(2)}, FlowID: uint32(10), - TableID: AntreaPolicyIngressRuleTable.GetID(), PolicyRef: &v1beta2.NetworkPolicyReference{ Type: v1beta2.AntreaNetworkPolicy, Namespace: "ns1", @@ -429,7 +391,6 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { To: []types.Address{NewOFPortAddress(1), NewOFPortAddress(3)}, Service: []v1beta2.Service{{Protocol: &protocolTCP, Port: &port8080}}, FlowID: uint32(11), - TableID: AntreaPolicyIngressRuleTable.GetID(), PolicyRef: &v1beta2.NetworkPolicyReference{ Type: v1beta2.AntreaNetworkPolicy, Namespace: "ns1", @@ -446,7 +407,6 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { To: []types.Address{NewOFPortAddress(1)}, Service: []v1beta2.Service{{Protocol: &protocolTCP, Port: &port8080}, {Protocol: &protocolICMP, ICMPType: &icmpType8, ICMPCode: &icmpCode0}}, FlowID: uint32(12), - TableID: AntreaPolicyIngressRuleTable.GetID(), PolicyRef: &v1beta2.NetworkPolicyReference{ Type: v1beta2.AntreaNetworkPolicy, Namespace: "ns1", @@ -462,7 +422,6 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { To: []types.Address{NewOFPortAddress(1), NewOFPortAddress(2)}, Service: []v1beta2.Service{}, FlowID: uint32(13), - TableID: AntreaPolicyIngressRuleTable.GetID(), PolicyRef: &v1beta2.NetworkPolicyReference{ Type: v1beta2.AntreaNetworkPolicy, Namespace: "ns1", @@ -471,149 +430,72 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { }, }, }, - expectedFlowsFn: func(c *client) []binding.Flow { - cookiePolicy := c.cookieAllocator.Request(cookie.NetworkPolicy).Raw() - return []binding.Flow{ - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchConjID(10). - Action().LoadToRegField(TFIngressConjIDField, 10). - Action().CT(true, IngressMetricTable.GetID(), CtZone, nil).LoadToLabelField(10, IngressRuleCTLabel).CTDone().Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchConjID(11). - Action().LoadToRegField(CNPConjIDField, 11). - Action().LoadRegMark(CnpDenyRegMark). - Action().GotoTable(IngressMetricTable.GetID()).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchConjID(12). - Action().LoadToRegField(CNPConjIDField, 12). - Action().LoadRegMark(CnpDenyRegMark). - Action().GotoTable(IngressMetricTable.GetID()).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). - MatchConjID(13). - Action().LoadToRegField(CNPConjIDField, 13). - Action().LoadRegMark(CnpDenyRegMark). - Action().GotoTable(IngressMetricTable.GetID()).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.40")). - Action().Conjunction(10, 1, 2). - Action().Conjunction(11, 1, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.40")). - Action().Conjunction(12, 1, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.50")). - Action().Conjunction(10, 1, 2).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchSrcIP(net.ParseIP("192.168.1.51")). - Action().Conjunction(11, 1, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). - MatchTunnelID(1). - Action().Conjunction(13, 1, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). - MatchTunnelID(2). - Action().Conjunction(13, 1, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchRegFieldWithValue(TargetOFPortField, uint32(1)). - Action().Conjunction(10, 2, 2). - Action().Conjunction(11, 2, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchRegFieldWithValue(TargetOFPortField, uint32(1)). - Action().Conjunction(12, 2, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchRegFieldWithValue(TargetOFPortField, uint32(2)). - Action().Conjunction(10, 2, 2).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchRegFieldWithValue(TargetOFPortField, uint32(3)). - Action().Conjunction(11, 2, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). - MatchRegFieldWithValue(TargetOFPortField, uint32(1)). - Action().Conjunction(13, 2, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority201).Cookie(cookiePolicy). - MatchRegFieldWithValue(TargetOFPortField, uint32(2)). - Action().Conjunction(13, 2, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority100).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolTCP).MatchDstPort(8080, nil). - Action().Conjunction(11, 3, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolTCP).MatchDstPort(8080, nil). - Action().Conjunction(12, 3, 3).Done(), - AntreaPolicyIngressRuleTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolICMP).MatchICMPType(byte(icmpType8)).MatchICMPCode(byte(icmpCode0)). - Action().Conjunction(12, 3, 3).Done(), - IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchCTStateNew(true).MatchCTLabelField(0, 10, IngressRuleCTLabel). - Action().NextTable().Done(), - IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchProtocol(binding.ProtocolIP).MatchCTStateNew(false).MatchCTLabelField(0, 10, IngressRuleCTLabel). - Action().NextTable().Done(), - IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 11). - Action().Drop().Done(), - IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 12). - Action().Drop().Done(), - IngressMetricTable.ofTable.BuildFlow(priorityNormal).Cookie(cookiePolicy). - MatchRegMark(CnpDenyRegMark).MatchRegFieldWithValue(CNPConjIDField, 13). - Action().Drop().Done(), - IngressDefaultTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchTunnelID(uint64(UnknownLabelIdentity)). - MatchRegFieldWithValue(TargetOFPortField, uint32(1)). - Action().Drop().Done(), - IngressDefaultTable.ofTable.BuildFlow(priority200).Cookie(cookiePolicy). - MatchTunnelID(uint64(UnknownLabelIdentity)). - MatchRegFieldWithValue(TargetOFPortField, uint32(2)). - Action().Drop().Done(), - } + expectedFlows: []string{ + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,conj_id=10,ip actions=set_field:0xa->reg6,ct(commit,table=IngressMetric,zone=65520,exec(set_field:0xa/0xffffffff->ct_label))", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,conj_id=11 actions=set_field:0xb->reg3,set_field:0x400/0x400->reg0,goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=200,conj_id=12 actions=set_field:0xc->reg3,set_field:0x400/0x400->reg0,goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=201,conj_id=13 actions=set_field:0xd->reg3,set_field:0x400/0x400->reg0,goto_table:IngressMetric", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,ip,nw_src=192.168.1.40 actions=conjunction(10,1/2),conjunction(11,1/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,ip,nw_src=192.168.1.50 actions=conjunction(10,1/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,ip,nw_src=192.168.1.51 actions=conjunction(11,1/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=201,tun_id=1 actions=conjunction(13,1/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=201,tun_id=2 actions=conjunction(13,1/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=201,reg1=0x1 actions=conjunction(13,2/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,reg1=0x1 actions=conjunction(10,2/2),conjunction(11,2/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,tcp,tp_dst=8080 actions=conjunction(11,3/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=200,reg1=0x1 actions=conjunction(12,2/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,reg1=0x2 actions=conjunction(10,2/2)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=200,tcp,tp_dst=8080 actions=conjunction(12,3/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=200,icmp,icmp_type=8,icmp_code=0 actions=conjunction(12,3/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=201,reg1=0x2 actions=conjunction(13,2/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=100,reg1=0x3 actions=conjunction(11,2/3)", + "cookie=0x1020000000000, table=AntreaPolicyIngressRule, priority=200,ip,nw_src=192.168.1.40 actions=conjunction(12,1/3)", + "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x1,tun_id=16777215 actions=drop", + "cookie=0x1020000000000, table=IngressDefaultRule, priority=200,reg1=0x2,tun_id=16777215 actions=drop", + "cookie=0x1020000000000, table=IngressMetric, priority=200,ct_state=+new,ct_label=0xa/0xffffffff,ip actions=goto_table:ConntrackCommit", + "cookie=0x1020000000000, table=IngressMetric, priority=200,ct_state=-new,ct_label=0xa/0xffffffff,ip actions=goto_table:ConntrackCommit", + "cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xb actions=drop", + "cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xc actions=drop", + "cookie=0x1020000000000, table=IngressMetric, priority=200,reg0=0x400/0x400,reg3=0xd actions=drop", }, }, - } - for _, tt := range tests { + } { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockOperations := oftest.NewMockOFEntryOperations(ctrl) - ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, false, false, false, false) - c = ofClient.(*client) - c.cookieAllocator = cookie.NewAllocator(0) - c.ofEntryOperations = mockOperations - c.nodeConfig = &config.NodeConfig{PodIPv4CIDR: podIPv4CIDR, PodIPv6CIDR: nil, Type: config.K8sNode} - c.networkConfig = &config.NetworkConfig{IPv4Enabled: true} - c.ipProtocols = []binding.Protocol{binding.ProtocolIP} - mockFeaturePodConnectivity.cookieAllocator = c.cookieAllocator - mockFeaturePodConnectivity.ipProtocols = c.ipProtocols - mockFeatureNetworkPolicy.cookieAllocator = c.cookieAllocator - mockFeatureNetworkPolicy.ipProtocols = c.ipProtocols - mockFeatureNetworkPolicy.bridge = c.bridge - c.featurePodConnectivity = &mockFeaturePodConnectivity - c.featureNetworkPolicy = &mockFeatureNetworkPolicy - c.featureNetworkPolicy.deterministic = true - c.featureNetworkPolicy.policyCache = cache.NewIndexer(policyConjKeyFunc, cache.Indexers{priorityIndex: priorityIndexFunc}) - c.featureNetworkPolicy.globalConjMatchFlowCache = map[string]*conjMatchFlowContext{} - - bridge := mocks.NewMockBridge(ctrl) - bridge.EXPECT().CreateTable(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - c.bridge = bridge - c.pipelines = pipelineMap - c.realizePipelines() - - expectedFlows := func(flows []binding.Flow) []ofctrl.OpenFlowModMessage { - flowMods := make([]ofctrl.OpenFlowModMessage, 0, len(flows)) - for _, f := range flows { - messages, _ := f.GetBundleMessages(binding.AddMessage) - flowMods = append(flowMods, messages[0]) + + c := newFakeClient(mockOperations, true, false, config.K8sNode, config.TrafficEncapModeEncap) + defer resetPipelines() + c.featureNetworkPolicy.egressTables = map[uint8]struct{}{EgressRuleTable.GetID(): {}, EgressDefaultTable.GetID(): {}, AntreaPolicyEgressRuleTable.GetID(): {}} + + for _, r := range tt.rules { + if r.Direction == v1beta2.DirectionOut { + if r.IsAntreaNetworkPolicyRule() { + r.TableID = AntreaPolicyEgressRuleTable.GetID() + } else { + r.TableID = EgressRuleTable.GetID() + } + } else { + if r.IsAntreaNetworkPolicyRule() { + r.TableID = AntreaPolicyIngressRuleTable.GetID() + } else { + r.TableID = IngressRuleTable.GetID() + } } - return flowMods - }(tt.expectedFlowsFn(c)) - expectedFlowStr := dumpFlows(expectedFlows) + } + c.featureNetworkPolicy.globalConjMatchFlowCache = make(map[string]*conjMatchFlowContext) + c.featureNetworkPolicy.policyCache = cache.NewIndexer(policyConjKeyFunc, cache.Indexers{priorityIndex: priorityIndexFunc}) + expectedFlowStr := strings.Join(tt.expectedFlows, "; ") // For better readability when debugging failure. eq := gomock.GotFormatterAdapter( gomock.GotFormatterFunc( func(i interface{}) string { - return dumpFlows(i.([]ofctrl.OpenFlowModMessage)) + return dumpFlows(i.([]*openflow15.FlowMod)) }), gomock.WantFormatter( gomock.StringerFunc(func() string { return expectedFlowStr }), - newFlowModIgnoreTxIDMatcher(expectedFlows), + newFlowModIgnoreTxIDMatcher(tt.expectedFlows), ), ) mockOperations.EXPECT().AddAll(eq).Return(nil).Times(1) @@ -624,15 +506,15 @@ func TestBatchInstallPolicyRuleFlows(t *testing.T) { } type flowModIgnoreTxIDMatcher struct { - flowMods []ofctrl.OpenFlowModMessage + flowMods []string } -func newFlowModIgnoreTxIDMatcher(flowModMessages []ofctrl.OpenFlowModMessage) gomock.Matcher { +func newFlowModIgnoreTxIDMatcher(flowModMessages []string) gomock.Matcher { return flowModIgnoreTxIDMatcher{flowMods: flowModMessages} } func (m flowModIgnoreTxIDMatcher) Matches(x interface{}) bool { - messages, ok := x.([]ofctrl.OpenFlowModMessage) + messages, ok := x.([]*openflow15.FlowMod) if !ok { return false } @@ -640,42 +522,40 @@ func (m flowModIgnoreTxIDMatcher) Matches(x interface{}) bool { if len(wanted) != len(m.flowMods) { return false } - given := getFlowStrings(m.flowMods) - usedFromGiven := make([]bool, len(given)) - foundFromWanted := make([]bool, len(wanted)) - for i, wantedItem := range wanted { - for j, givenItem := range given { - if usedFromGiven[j] { - continue - } - if wantedItem == givenItem { - foundFromWanted[i] = true - usedFromGiven[j] = true - break - } + sortFlows := func(flows []string) sets.String { + sort.Strings(flows) + flowSet := sets.NewString() + getConjunctionID := func(conj string) int { + newStr := strings.ReplaceAll(conj, "conjunction(", "") + id, _ := strconv.Atoi(strings.Split(newStr, ",")[0]) + return id } - } - missingFromWanted := 0 - for _, found := range foundFromWanted { - if !found { - missingFromWanted++ - } - } - extraInGiven := 0 - for _, used := range usedFromGiven { - if !used { - extraInGiven++ + for _, f := range flows { + if !strings.Contains(f, "actions=conjunction") { + flowSet.Insert(f) + } + strParts := strings.Split(f, "actions=") + prefix := strParts[0] + conjunctions := strings.Split(strParts[1], ",") + sort.Slice(conjunctions, func(i, j int) bool { + c1 := getConjunctionID(conjunctions[i]) + c2 := getConjunctionID(conjunctions[j]) + return c1 < c2 + }) + flowSet.Insert(strings.Join([]string{prefix, strings.Join(conjunctions, ",")}, "actions=")) } + return flowSet } - - return extraInGiven == 0 && missingFromWanted == 0 + wantedSets := sortFlows(wanted) + givenSets := sortFlows(m.flowMods) + return wantedSets.Equal(givenSets) } func (m flowModIgnoreTxIDMatcher) String() string { - return fmt.Sprintf("has the same elements as %v", dumpFlows(m.flowMods)) + return fmt.Sprintf("has the same elements as %v", strings.Join(m.flowMods, "; ")) } -func dumpFlows(flows []ofctrl.OpenFlowModMessage) string { +func dumpFlows(flows []*openflow15.FlowMod) string { flowStrings := getFlowStrings(flows) return strings.Join(flowStrings, "; ") } @@ -1012,8 +892,6 @@ func newMockDropFlowBuilder(ctrl *gomock.Controller, flowTable *mocks.MockTable) &ofctrl.FlowBundleMessage{}, } dropFlow.EXPECT().GetBundleMessages(binding.AddMessage).Return(flowModMessages, nil).AnyTimes() - dropFlow.EXPECT().GetTable().Return(flowTable).AnyTimes() - dropFlow.EXPECT().IsDropFlow().Return(true).AnyTimes() dropFlow.EXPECT().MatchString().Return("").AnyTimes() return dropFlowBuilder } @@ -1044,8 +922,6 @@ func newMockRuleFlowBuilder(ctrl *gomock.Controller, flowTable *mocks.MockTable) &ofctrl.FlowBundleMessage{}, } ruleFlow.EXPECT().GetBundleMessages(binding.AddMessage).Return(flowModMessages, nil).AnyTimes() - ruleFlow.EXPECT().GetTable().Return(flowTable).AnyTimes() - ruleFlow.EXPECT().IsDropFlow().Return(false).AnyTimes() ruleFlow.EXPECT().MatchString().Return("").AnyTimes() return ruleFlowBuilder } @@ -1068,8 +944,6 @@ func newMockMetricFlowBuilder(ctrl *gomock.Controller, flowTable *mocks.MockTabl &ofctrl.FlowBundleMessage{}, } metricFlow.EXPECT().GetBundleMessages(binding.AddMessage).Return(flowModMessages, nil).AnyTimes() - metricFlow.EXPECT().GetTable().Return(flowTable).AnyTimes() - metricFlow.EXPECT().IsDropFlow().Return(true).AnyTimes() metricFlow.EXPECT().MatchString().Return("").AnyTimes() return metricFlowBuilder } @@ -1421,10 +1295,10 @@ func TestClient_GetPolicyInfoFromConjunction(t *testing.T) { ruleName: fmt.Sprint(ruleID1), } flow := EgressRuleTable.ofTable.BuildFlow(priority100).MatchCTSrcIP(net.ParseIP("1.1.1.10")).Action().Drop().Done() - cf := getCachedFlow(flow, binding.AddMessage) + cf := getFlowModMessage(flow, binding.AddMessage) conj2 := &policyRuleConjunction{ id: ruleID2, - actionFlows: []*cachedFlow{cf}, + actionFlows: []*openflow15.FlowMod{cf}, npRef: npRef, ruleName: fmt.Sprint(ruleID2), } diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index b124f410435..7105fa1bb91 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -387,37 +387,34 @@ var ( ) type OFEntryOperations interface { - AddAll(flowMessages []ofctrl.OpenFlowModMessage) error - ModifyAll(flowMessages []ofctrl.OpenFlowModMessage) error - BundleOps(adds, mods, dels []ofctrl.OpenFlowModMessage) error - DeleteAll(flowMessages []ofctrl.OpenFlowModMessage) error + AddAll(flows []*openflow15.FlowMod) error + ModifyAll(flows []*openflow15.FlowMod) error + BundleOps(adds, mods, dels []*openflow15.FlowMod) error + DeleteAll(flows []*openflow15.FlowMod) error AddOFEntries(ofEntries []binding.OFEntry) error ModifyOFEntries(ofEntries []binding.OFEntry) error DeleteOFEntries(ofEntries []binding.OFEntry) error } -type cachedFlow struct { - key string - message ofctrl.OpenFlowModMessage - table binding.Table - isDropFlow bool +func copyFlowByNewPriority(flowMod *openflow15.FlowMod, priority uint16) *openflow15.FlowMod { + newFlow := *flowMod + newFlow.Priority = priority + return &newFlow } -func (f *cachedFlow) copyFlowByNewPriority(priority uint16) *cachedFlow { - flowMod := *f.message.(*ofctrl.FlowBundleMessage) - flowMod.GetMessage().(*openflow15.FlowMod).Priority = priority - return &cachedFlow{key: f.key, table: f.table, isDropFlow: f.isDropFlow, message: &flowMod} +func flowMessageMatched(oldFlow, newFlow *openflow15.FlowMod) bool { + return oldFlow.Priority == newFlow.Priority && getFlowKey(oldFlow) == getFlowKey(newFlow) } -func (f *cachedFlow) flowPriority() uint16 { - return f.message.GetMessage().(*openflow15.FlowMod).Priority +func isDropFlow(f *openflow15.FlowMod) bool { + return len(f.Instructions) == 0 } -func (f *cachedFlow) matched(newFlow *cachedFlow) bool { - return f.flowPriority() == newFlow.flowPriority() && f.key == newFlow.key +func getFlowKey(fm *openflow15.FlowMod) string { + return binding.FlowModMatchString(fm) } -type flowMessageCache map[string]*cachedFlow +type flowMessageCache map[string]*openflow15.FlowMod type flowCategoryCache struct { sync.Map @@ -478,7 +475,7 @@ func (c *client) GetTunnelVirtualMAC() net.HardwareAddr { return GlobalVirtualMAC } -func (c *client) changeAll(flowsMap map[ofAction][]ofctrl.OpenFlowModMessage) error { +func (c *client) changeAll(flowsMap map[ofAction][]*openflow15.FlowMod) error { if len(flowsMap) == 0 { return nil } @@ -509,20 +506,20 @@ func (c *client) changeAll(flowsMap map[ofAction][]ofctrl.OpenFlowModMessage) er return nil } -func (c *client) AddAll(flowMessages []ofctrl.OpenFlowModMessage) error { - return c.changeAll(map[ofAction][]ofctrl.OpenFlowModMessage{add: flowMessages}) +func (c *client) AddAll(flowMessages []*openflow15.FlowMod) error { + return c.changeAll(map[ofAction][]*openflow15.FlowMod{add: flowMessages}) } -func (c *client) ModifyAll(flowMessages []ofctrl.OpenFlowModMessage) error { - return c.changeAll(map[ofAction][]ofctrl.OpenFlowModMessage{mod: flowMessages}) +func (c *client) ModifyAll(flowMessages []*openflow15.FlowMod) error { + return c.changeAll(map[ofAction][]*openflow15.FlowMod{mod: flowMessages}) } -func (c *client) DeleteAll(flowMessages []ofctrl.OpenFlowModMessage) error { - return c.changeAll(map[ofAction][]ofctrl.OpenFlowModMessage{del: flowMessages}) +func (c *client) DeleteAll(flowMessages []*openflow15.FlowMod) error { + return c.changeAll(map[ofAction][]*openflow15.FlowMod{del: flowMessages}) } -func (c *client) BundleOps(adds, mods, dels []ofctrl.OpenFlowModMessage) error { - return c.changeAll(map[ofAction][]ofctrl.OpenFlowModMessage{add: adds, mod: mods, del: dels}) +func (c *client) BundleOps(adds, mods, dels []*openflow15.FlowMod) error { + return c.changeAll(map[ofAction][]*openflow15.FlowMod{add: adds, mod: mods, del: dels}) } func (c *client) changeOFEntries(ofEntries []binding.OFEntry, action ofAction) error { @@ -564,7 +561,7 @@ func (c *client) DeleteOFEntries(ofEntries []binding.OFEntry) error { return c.changeOFEntries(ofEntries, del) } -func (c *client) defaultFlows() []ofctrl.OpenFlowModMessage { +func (c *client) defaultFlows() []*openflow15.FlowMod { cookieID := c.cookieAllocator.Request(cookie.Default).Raw() var flows []binding.Flow for id, pipeline := range c.pipelines { @@ -1188,7 +1185,11 @@ func (f *featureNetworkPolicy) flowsToTrace(dataplaneTag uint8, defer f.conjMatchFlowLock.Unlock() for _, ctx := range f.globalConjMatchFlowCache { if ctx.dropFlow != nil { - dropFlow := f.defaultDropFlow(ctx.dropFlow.table, ctx.matchPairs, ctx.dropFlowEnableLogging) + table, err := f.bridge.GetTableByID(ctx.dropFlow.TableId) + if err != nil { + continue + } + dropFlow := f.defaultDropFlow(table, ctx.matchPairs, ctx.dropFlowEnableLogging) copyFlowBuilder := dropFlow.CopyToBuilder(priorityNormal+2, false) if dropFlow.FlowProtocol() == "" { copyFlowBuilderIPv6 := dropFlow.CopyToBuilder(priorityNormal+2, false) @@ -1216,13 +1217,17 @@ func (f *featureNetworkPolicy) flowsToTrace(dataplaneTag uint8, // Copy Antrea NetworkPolicy drop rules. for _, obj := range f.policyCache.List() { conj := obj.(*policyRuleConjunction) - for _, cf := range conj.metricFlows { - if cf.isDropFlow { + for _, flow := range conj.metricFlows { + table, err := f.bridge.GetTableByID(flow.TableId) + if err != nil { + continue + } + if isDropFlow(flow) { conjID := conj.id // Generate both IPv4 and IPv6 flows if the original drop flow doesn't match IP/IPv6. // DSCP field is in IP/IPv6 headers so IP/IPv6 match is required in a flow. - copyFlowBuilderIPv6 := cf.table.BuildFlow(priorityNormal+2). + copyFlowBuilderIPv6 := table.BuildFlow(priorityNormal+2). MatchRegMark(CnpDenyRegMark). MatchRegFieldWithValue(CNPConjIDField, conjID). MatchProtocol(binding.ProtocolIPv6) @@ -1234,7 +1239,7 @@ func (f *featureNetworkPolicy) flowsToTrace(dataplaneTag uint8, Cookie(cookieID). Action().SendToController(uint8(PacketInReasonTF)). Done()) - copyFlowBuilder := cf.table.BuildFlow(priorityNormal+2). + copyFlowBuilder := table.BuildFlow(priorityNormal+2). MatchRegMark(CnpDenyRegMark). MatchRegFieldWithValue(CNPConjIDField, conjID).MatchProtocol(binding.ProtocolIP) if f.ovsMetersAreSupported { @@ -3041,12 +3046,12 @@ func (f *featureService) gatewaySNATFlows() []binding.Flow { return flows } -func getCachedFlowMessages(cache *flowCategoryCache) []ofctrl.OpenFlowModMessage { - var flows []ofctrl.OpenFlowModMessage +func getCachedFlowMessages(cache *flowCategoryCache) []*openflow15.FlowMod { + var flows []*openflow15.FlowMod cache.Range(func(key, value interface{}) bool { fCache := value.(flowMessageCache) for _, flow := range fCache { - flows = append(flows, flow.message) + flows = append(flows, flow) } return true }) diff --git a/pkg/agent/openflow/pod_connectivity.go b/pkg/agent/openflow/pod_connectivity.go index f234674c0f7..0bd07649e4b 100644 --- a/pkg/agent/openflow/pod_connectivity.go +++ b/pkg/agent/openflow/pod_connectivity.go @@ -17,7 +17,7 @@ package openflow import ( "net" - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/openflow/cookie" @@ -128,7 +128,7 @@ func newFeaturePodConnectivity( } } -func (f *featurePodConnectivity) initFlows() []ofctrl.OpenFlowModMessage { +func (f *featurePodConnectivity) initFlows() []*openflow15.FlowMod { var flows []binding.Flow gatewayMAC := f.nodeConfig.GatewayConfig.MAC @@ -186,8 +186,8 @@ func (f *featurePodConnectivity) initFlows() []ofctrl.OpenFlowModMessage { return getFlowMessages(flows) } -func (f *featurePodConnectivity) replayFlows() []ofctrl.OpenFlowModMessage { - var flows []ofctrl.OpenFlowModMessage +func (f *featurePodConnectivity) replayFlows() []*openflow15.FlowMod { + var flows []*openflow15.FlowMod // Get cached flows. for _, cachedFlows := range []*flowCategoryCache{f.nodeCachedFlows, f.podCachedFlows, f.tcCachedFlows} { diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 06a16ff5399..e59bef18b63 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -18,7 +18,7 @@ import ( "net" "sync" - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" @@ -134,7 +134,7 @@ func (f *featureService) serviceNoEndpointFlow() binding.Flow { Done() } -func (f *featureService) initFlows() []ofctrl.OpenFlowModMessage { +func (f *featureService) initFlows() []*openflow15.FlowMod { var flows []binding.Flow if f.enableProxy { flows = append(flows, f.conntrackFlows()...) @@ -161,7 +161,7 @@ func (f *featureService) initFlows() []ofctrl.OpenFlowModMessage { return getFlowMessages(flows) } -func (f *featureService) replayFlows() []ofctrl.OpenFlowModMessage { +func (f *featureService) replayFlows() []*openflow15.FlowMod { return getCachedFlowMessages(f.cachedFlows) } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 4229da74390..05e24521786 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -26,9 +26,9 @@ import ( openflow "antrea.io/antrea/pkg/ovs/openflow" ip "antrea.io/antrea/pkg/util/ip" proxy "antrea.io/antrea/third_party/proxy" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" - ofctrl "antrea.io/ofnet/ofctrl" gomock "github.com/golang/mock/gomock" v1 "k8s.io/api/core/v1" net "net" @@ -1060,7 +1060,7 @@ func (m *MockOFEntryOperations) EXPECT() *MockOFEntryOperationsMockRecorder { } // AddAll mocks base method -func (m *MockOFEntryOperations) AddAll(arg0 []ofctrl.OpenFlowModMessage) error { +func (m *MockOFEntryOperations) AddAll(arg0 []*openflow15.FlowMod) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddAll", arg0) ret0, _ := ret[0].(error) @@ -1088,7 +1088,7 @@ func (mr *MockOFEntryOperationsMockRecorder) AddOFEntries(arg0 interface{}) *gom } // BundleOps mocks base method -func (m *MockOFEntryOperations) BundleOps(arg0, arg1, arg2 []ofctrl.OpenFlowModMessage) error { +func (m *MockOFEntryOperations) BundleOps(arg0, arg1, arg2 []*openflow15.FlowMod) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "BundleOps", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -1102,7 +1102,7 @@ func (mr *MockOFEntryOperationsMockRecorder) BundleOps(arg0, arg1, arg2 interfac } // DeleteAll mocks base method -func (m *MockOFEntryOperations) DeleteAll(arg0 []ofctrl.OpenFlowModMessage) error { +func (m *MockOFEntryOperations) DeleteAll(arg0 []*openflow15.FlowMod) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DeleteAll", arg0) ret0, _ := ret[0].(error) @@ -1130,7 +1130,7 @@ func (mr *MockOFEntryOperationsMockRecorder) DeleteOFEntries(arg0 interface{}) * } // ModifyAll mocks base method -func (m *MockOFEntryOperations) ModifyAll(arg0 []ofctrl.OpenFlowModMessage) error { +func (m *MockOFEntryOperations) ModifyAll(arg0 []*openflow15.FlowMod) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ModifyAll", arg0) ret0, _ := ret[0].(error) diff --git a/pkg/agent/openflow/traceflow.go b/pkg/agent/openflow/traceflow.go index 604a64b80c5..4283aa5a98e 100644 --- a/pkg/agent/openflow/traceflow.go +++ b/pkg/agent/openflow/traceflow.go @@ -15,7 +15,7 @@ package openflow import ( - "antrea.io/ofnet/ofctrl" + "antrea.io/libOpenflow/openflow15" ) type featureTraceflow struct { @@ -32,10 +32,10 @@ func newFeatureTraceflow() *featureTraceflow { } } -func (f *featureTraceflow) initFlows() []ofctrl.OpenFlowModMessage { - return []ofctrl.OpenFlowModMessage{} +func (f *featureTraceflow) initFlows() []*openflow15.FlowMod { + return []*openflow15.FlowMod{} } -func (f *featureTraceflow) replayFlows() []ofctrl.OpenFlowModMessage { - return []ofctrl.OpenFlowModMessage{} +func (f *featureTraceflow) replayFlows() []*openflow15.FlowMod { + return []*openflow15.FlowMod{} } diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 8bd3840f6fa..75607f7c95f 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -18,6 +18,7 @@ import ( "net" "time" + "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" @@ -100,6 +101,7 @@ type Bridge interface { CreateTable(table Table, next uint8, missAction MissActionType) Table // AddTable adds table on the Bridge. Return true if the operation succeeds, otherwise return false. DeleteTable(id uint8) bool + GetTableByID(id uint8) (Table, error) CreateGroupTypeAll(id GroupIDType) Group CreateGroup(id GroupIDType) Group DeleteGroup(id GroupIDType) error @@ -114,7 +116,7 @@ type Bridge interface { DeleteFlowsByCookie(cookieID, cookieMask uint64) error // AddFlowsInBundle syncs multiple Openflow entries in a single transaction. This operation could add new flows in // "addFlows", modify flows in "modFlows", and remove flows in "delFlows" in the same bundle. - AddFlowsInBundle(addflows, modFlows, delFlows []ofctrl.OpenFlowModMessage) error + AddFlowsInBundle(addflows, modFlows, delFlows []*openflow15.FlowMod) error // AddOFEntriesInBundle syncs multiple Openflow entries(including Flow and Group) in a single transaction. This // operation could add new entries in "addEntries", modify entries in "modEntries", and remove entries in // "delEntries" in the same bundle. @@ -207,8 +209,6 @@ type Flow interface { // It copies the original actions of the Flow only if copyActions is set to true, and // resets the priority in the new FlowBuilder if the provided priority is not 0. CopyToBuilder(priority uint16, copyActions bool) FlowBuilder - IsDropFlow() bool - GetTable() Table } type Action interface { diff --git a/pkg/ovs/openflow/ofctrl_action.go b/pkg/ovs/openflow/ofctrl_action.go index 7250e26876f..c63a90c25cf 100644 --- a/pkg/ovs/openflow/ofctrl_action.go +++ b/pkg/ovs/openflow/ofctrl_action.go @@ -33,7 +33,6 @@ type ofFlowAction struct { // Drop is an action to drop packets. func (a *ofFlowAction) Drop() FlowBuilder { a.builder.Drop() - a.builder.isDropFlow = true return a.builder } diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index 19d4d85ba2a..a234d704114 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -298,6 +298,17 @@ func (b *OFBridge) DeleteTable(id uint8) bool { return true } +// GetTableByID returns the existing Table by the given id. If no Table exists, an error is returned. +func (b *OFBridge) GetTableByID(id uint8) (Table, error) { + b.Lock() + defer b.Unlock() + t, ok := b.tableCache[id] + if !ok { + return nil, fmt.Errorf("no table exists with ID %d", id) + } + return t, nil +} + // DumpTableStatus dumps table status from local cache. func (b *OFBridge) DumpTableStatus() []TableStatus { var r []TableStatus @@ -448,7 +459,7 @@ func (b *OFBridge) IsConnected() bool { return sw.IsReady() } -func (b *OFBridge) AddFlowsInBundle(addflows, modFlows, delFlows []ofctrl.OpenFlowModMessage) error { +func (b *OFBridge) AddFlowsInBundle(addflows, modFlows, delFlows []*openflow15.FlowMod) error { // If no Openflow entries are requested to be added or modified or deleted on the OVS bridge, return immediately. if len(addflows) == 0 && len(modFlows) == 0 && len(delFlows) == 0 { klog.V(2).Info("No Openflow entries need to be synced to the OVS bridge, returning") @@ -461,14 +472,14 @@ func (b *OFBridge) AddFlowsInBundle(addflows, modFlows, delFlows []ofctrl.OpenFl return err } - syncFlows := func(flows []ofctrl.OpenFlowModMessage, operation int) error { + syncFlows := func(flows []*openflow15.FlowMod, operation int) error { for _, flowMod := range flows { - flowMod.GetMessage().(*openflow15.FlowMod).Command = uint8(operation) + flowMod.Command = uint8(operation) // "AddFlow" operation is async, the function only returns error which occur when constructing and sending // the BundleAdd message. An absence of error does not mean that all Openflow entries are added into the // bundle by the switch. The number of entries successfully added to the bundle by the switch will be // returned by function "Complete". - if err := tx.AddMessage(flowMod); err != nil { + if err := tx.AddFlow(flowMod); err != nil { // Close the bundle and cancel it if there is error when adding the FlowMod message. _, err := tx.Complete() if err == nil { @@ -510,13 +521,11 @@ func (b *OFBridge) AddFlowsInBundle(addflows, modFlows, delFlows []ofctrl.OpenFl } // Update TableStatus after the transaction is committed successfully. - for _, flow := range addflows { - message := flow.GetMessage().(*openflow15.FlowMod) + for _, message := range addflows { table := b.tableCache[message.TableId] table.UpdateStatus(1) } - for _, flow := range delFlows { - message := flow.GetMessage().(*openflow15.FlowMod) + for _, message := range delFlows { table := b.tableCache[message.TableId] table.UpdateStatus(-1) } diff --git a/pkg/ovs/openflow/ofctrl_builder.go b/pkg/ovs/openflow/ofctrl_builder.go index 8d9bd573d5e..b59aad51ebb 100644 --- a/pkg/ovs/openflow/ofctrl_builder.go +++ b/pkg/ovs/openflow/ofctrl_builder.go @@ -15,9 +15,7 @@ package openflow import ( - "fmt" "net" - "strings" "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" @@ -29,8 +27,6 @@ type ofFlowBuilder struct { } func (b *ofFlowBuilder) MatchTunMetadata(index int, data uint32) FlowBuilder { - s := fmt.Sprintf("tun_metadata%d=0x%x", index, data) - b.matchers = append(b.matchers, s) rng := openflow15.NewNXRange(0, 31) tm := &ofctrl.NXTunMetadata{ ID: index, @@ -66,15 +62,7 @@ func (b *ofFlowBuilder) MatchVLAN(nonVLAN bool, vlanID uint16, vlanMask *uint16) if !nonVLAN { value |= openflow15.OFPVID_PRESENT } - mask := *vlanMask - var matchStr string - if mask == uint16(openflow15.OFPVID_PRESENT|protocol.VID_MASK) { - matchStr = fmt.Sprintf("dl_vlan=%d", value&protocol.VID_MASK) - } else { - matchStr = fmt.Sprintf("vlan_tci=0x%04x/0x%04x", value&openflow15.OFPVID_PRESENT, openflow15.OFPVID_PRESENT) - } - b.matchers = append(b.matchers, matchStr) b.Match.NonVlan = nonVLAN b.Match.VlanId = &vlanID b.Match.VlanMask = vlanMask @@ -96,16 +84,11 @@ func (b *ofFlowBuilder) Done() Flow { b.Flow.Match.CtStates = b.ctStates b.ctStates = nil } - if b.ctStateString != "" { - b.matchers = append(b.matchers, b.ctStateString) - b.ctStateString = "" - } return &b.ofFlow } // matchReg adds match condition for matching data in the target register. func (b *ofFlowBuilder) matchReg(regID int, data uint32) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("reg%d=0x%x", regID, data)) reg := &ofctrl.NXRegister{ ID: regID, Data: data, @@ -116,8 +99,6 @@ func (b *ofFlowBuilder) matchReg(regID int, data uint32) FlowBuilder { // MatchXXReg adds match condition for matching data in the target xx-register. func (b *ofFlowBuilder) MatchXXReg(regID int, data []byte) FlowBuilder { - s := fmt.Sprintf("xxreg%d=0x%x", regID, data) - b.matchers = append(b.matchers, s) reg := &ofctrl.XXRegister{ ID: regID, Data: data, @@ -128,9 +109,6 @@ func (b *ofFlowBuilder) MatchXXReg(regID int, data []byte) FlowBuilder { // matchRegRange adds match condition for matching data in the target register at specified range. func (b *ofFlowBuilder) matchRegRange(regID int, data uint32, rng *Range) FlowBuilder { - mask := rng.ToNXRange().ToUint32Mask() - s := fmt.Sprintf("reg%d=0x%x/0x%x", regID, (data< 0 { b.ofFlow.Match.CtMarkMask = &mask - for i, data := range b.matchers { - if strings.HasPrefix(data, "ct_mark=") { - b.matchers[i] = fmt.Sprintf("%s/0x%x", data, mask) - break - } - } } return b } @@ -309,7 +255,6 @@ func (b *ofFlowBuilder) MatchCTMarkMask(mask uint32) FlowBuilder { // MatchPktMark adds match condition for matching pkt_mark. If mask is nil, the mask should be not set in the OpenFlow // message which is sent to OVS, and OVS should match the value exactly. func (b *ofFlowBuilder) MatchPktMark(value uint32, mask *uint32) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("pkt_mark=%d", value)) b.ofFlow.Match.PktMark = value b.ofFlow.Match.PktMarkMask = mask return b @@ -317,18 +262,12 @@ func (b *ofFlowBuilder) MatchPktMark(value uint32, mask *uint32) FlowBuilder { // MatchTunnelDst adds match condition for matching tun_dst or tun_ipv6_dst. func (b *ofFlowBuilder) MatchTunnelDst(dstIP net.IP) FlowBuilder { - if dstIP.To4() != nil { - b.matchers = append(b.matchers, fmt.Sprintf("tun_dst=%s", dstIP.String())) - } else { - b.matchers = append(b.matchers, fmt.Sprintf("tun_ipv6_dst=%s", dstIP.String())) - } b.ofFlow.Match.TunnelDst = &dstIP return b } // MatchTunnelID adds match condition for matching tun_id. func (b *ofFlowBuilder) MatchTunnelID(tunnelID uint64) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("tun_id=%d", tunnelID)) b.ofFlow.Match.TunnelId = tunnelID return b } @@ -358,66 +297,44 @@ func ctLabelRange(high, low uint64, rng *Range, match *ofctrl.FlowMatch) { func (b *ofFlowBuilder) MatchCTLabelField(high, low uint64, field *CtLabel) FlowBuilder { ctLabelRange(high, low, field.GetRange(), &b.ofFlow.Match) - var matchStr string - if field.rng[1] < 64 { - matchStr = fmt.Sprintf("ct_label=0x%x/0x%x", b.ofFlow.Match.CtLabelLo, b.ofFlow.Match.CtLabelLoMask) - } else { - matchStr = fmt.Sprintf("ct_label=0x%x%016x/0x%x%016x", b.ofFlow.Match.CtLabelHi, b.ofFlow.Match.CtLabelLo, b.ofFlow.Match.CtLabelHiMask, b.ofFlow.Match.CtLabelLoMask) - } - b.matchers = append(b.matchers, matchStr) return b } // MatchInPort adds match condition for matching in_port. func (b *ofFlowBuilder) MatchInPort(inPort uint32) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("in_port=%d", inPort)) b.Match.InputPort = inPort return b } // MatchDstIP adds match condition for matching destination IP address. func (b *ofFlowBuilder) MatchDstIP(ip net.IP) FlowBuilder { - if ip.To4() != nil { - b.matchers = append(b.matchers, fmt.Sprintf("nw_dst=%s", ip.String())) - } else { - b.matchers = append(b.matchers, fmt.Sprintf("ipv6_dst=%s", ip.String())) - } b.Match.IpDa = &ip return b } // MatchDstIPNet adds match condition for matching destination IP CIDR. func (b *ofFlowBuilder) MatchDstIPNet(ipnet net.IPNet) FlowBuilder { - if ipnet.IP.To4() != nil { - b.matchers = append(b.matchers, fmt.Sprintf("nw_dst=%s", ipnet.String())) - } else { - b.matchers = append(b.matchers, fmt.Sprintf("ipv6_dst=%s", ipnet.String())) - } b.Match.IpDa = &ipnet.IP b.Match.IpDaMask = maskToIP(ipnet.Mask) return b } func (b *ofFlowBuilder) MatchICMPType(icmpType byte) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("icmp_type=%d", icmpType)) b.Match.Icmp4Type = &icmpType return b } func (b *ofFlowBuilder) MatchICMPCode(icmpCode byte) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("icmp_code=%d", icmpCode)) b.Match.Icmp4Code = &icmpCode return b } func (b *ofFlowBuilder) MatchICMPv6Type(icmp6Type byte) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("icmp_type=%d", icmp6Type)) b.Match.Icmp6Type = &icmp6Type return b } func (b *ofFlowBuilder) MatchICMPv6Code(icmp6Code byte) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("icmp_code=%d", icmp6Code)) b.Match.Icmp6Code = &icmp6Code return b } @@ -429,22 +346,12 @@ func maskToIP(mask net.IPMask) *net.IP { // MatchSrcIP adds match condition for matching source IP address. func (b *ofFlowBuilder) MatchSrcIP(ip net.IP) FlowBuilder { - if ip.To4() != nil { - b.matchers = append(b.matchers, fmt.Sprintf("nw_src=%s", ip.String())) - } else { - b.matchers = append(b.matchers, fmt.Sprintf("ipv6_src=%s", ip.String())) - } b.Match.IpSa = &ip return b } // MatchSrcIPNet adds match condition for matching source IP CIDR. func (b *ofFlowBuilder) MatchSrcIPNet(ipnet net.IPNet) FlowBuilder { - if ipnet.IP.To4() != nil { - b.matchers = append(b.matchers, fmt.Sprintf("nw_src=%s", ipnet.String())) - } else { - b.matchers = append(b.matchers, fmt.Sprintf("ipv6_src=%s", ipnet.String())) - } b.Match.IpSa = &ipnet.IP b.Match.IpSaMask = maskToIP(ipnet.Mask) return b @@ -452,49 +359,42 @@ func (b *ofFlowBuilder) MatchSrcIPNet(ipnet net.IPNet) FlowBuilder { // MatchDstMAC adds match condition for matching destination MAC address. func (b *ofFlowBuilder) MatchDstMAC(mac net.HardwareAddr) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("dl_dst=%s", mac.String())) b.Match.MacDa = &mac return b } // MatchSrcMAC adds match condition for matching source MAC address. func (b *ofFlowBuilder) MatchSrcMAC(mac net.HardwareAddr) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("dl_src=%s", mac.String())) b.Match.MacSa = &mac return b } // MatchARPSha adds match condition for matching ARP source host address. func (b *ofFlowBuilder) MatchARPSha(mac net.HardwareAddr) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("arp_sha=%s", mac.String())) b.Match.ArpSha = &mac return b } // MatchARPTha adds match condition for matching ARP target host address. func (b *ofFlowBuilder) MatchARPTha(mac net.HardwareAddr) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("arp_tha=%s", mac.String())) b.Match.ArpTha = &mac return b } // MatchARPSpa adds match condition for matching ARP source protocol address. func (b *ofFlowBuilder) MatchARPSpa(ip net.IP) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("arp_spa=%s", ip.String())) b.Match.ArpSpa = &ip return b } // MatchARPTpa adds match condition for matching ARP target protocol address. func (b *ofFlowBuilder) MatchARPTpa(ip net.IP) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("arp_tpa=%s", ip.String())) b.Match.ArpTpa = &ip return b } // MatchARPOp adds match condition for matching ARP operator. func (b *ofFlowBuilder) MatchARPOp(op uint16) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("arp_op=%d", op)) b.Match.ArpOper = op return b } @@ -503,14 +403,12 @@ func (b *ofFlowBuilder) MatchARPOp(op uint16) FlowBuilder { // the field name is shown as "nw_tos" with OVS command line, and the value is calculated by shifting the given value // left 2 bits. func (b *ofFlowBuilder) MatchIPDSCP(dscp uint8) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("nw_tos=%d", dscp<<2)) b.Match.IpDscp = dscp return b } // MatchConjID adds match condition for matching conj_id. func (b *ofFlowBuilder) MatchConjID(value uint32) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("conj_id=%d", value)) b.Match.ConjunctionID = &value return b } @@ -577,11 +475,6 @@ func (b *ofFlowBuilder) MatchIPProtocolValue(isIPv6 bool, protoValue uint8) Flow func (b *ofFlowBuilder) MatchDstPort(port uint16, portMask *uint16) FlowBuilder { b.Match.DstPort = port b.Match.DstPortMask = portMask - matchStr := fmt.Sprintf("tp_dst=0x%x", port) - if portMask != nil { - matchStr = fmt.Sprintf("%s/0x%x", matchStr, *portMask) - } - b.matchers = append(b.matchers, matchStr) return b } @@ -590,16 +483,10 @@ func (b *ofFlowBuilder) MatchDstPort(port uint16, portMask *uint16) FlowBuilder func (b *ofFlowBuilder) MatchSrcPort(port uint16, portMask *uint16) FlowBuilder { b.Match.SrcPort = port b.Match.SrcPortMask = portMask - matchStr := fmt.Sprintf("tp_src=0x%x", port) - if portMask != nil { - matchStr = fmt.Sprintf("%s/0x%x", matchStr, *portMask) - } - b.matchers = append(b.matchers, matchStr) return b } func (b *ofFlowBuilder) MatchTCPFlags(flag, mask uint16) FlowBuilder { - b.matchers = append(b.matchers, fmt.Sprintf("tcp_flags=%b/%b", uint8(flag), uint8(mask))) b.Match.TcpFlags = &flag b.Match.TcpFlagsMask = &mask return b @@ -611,10 +498,8 @@ func (b *ofFlowBuilder) MatchTCPFlags(flag, mask uint16) FlowBuilder { func (b *ofFlowBuilder) MatchCTSrcIP(ip net.IP) FlowBuilder { if ip.To4() != nil { b.Match.CtIpSa = &ip - b.matchers = append(b.matchers, fmt.Sprintf("ct_nw_src=%s", ip.String())) } else { b.Match.CtIpv6Sa = &ip - b.matchers = append(b.matchers, fmt.Sprintf("ct_ipv6_src=%s", ip.String())) } return b } @@ -624,11 +509,9 @@ func (b *ofFlowBuilder) MatchCTSrcIPNet(ipNet net.IPNet) FlowBuilder { if ipNet.IP.To4() != nil { b.Match.CtIpSa = &ipNet.IP b.Match.CtIpSaMask = maskToIP(ipNet.Mask) - b.matchers = append(b.matchers, fmt.Sprintf("ct_nw_src=%s", ipNet.String())) } else { b.Match.CtIpv6Sa = &ipNet.IP b.Match.CtIpv6SaMask = maskToIP(ipNet.Mask) - b.matchers = append(b.matchers, fmt.Sprintf("ct_ipv6_src=%s", ipNet.String())) } return b } @@ -639,10 +522,8 @@ func (b *ofFlowBuilder) MatchCTSrcIPNet(ipNet net.IPNet) FlowBuilder { func (b *ofFlowBuilder) MatchCTDstIP(ip net.IP) FlowBuilder { if ip.To4() != nil { b.Match.CtIpDa = &ip - b.matchers = append(b.matchers, fmt.Sprintf("ct_nw_dst=%s", ip.String())) } else { b.Match.CtIpv6Da = &ip - b.matchers = append(b.matchers, fmt.Sprintf("ct_ipv6_dst=%s", ip.String())) } return b } @@ -652,11 +533,9 @@ func (b *ofFlowBuilder) MatchCTDstIPNet(ipNet net.IPNet) FlowBuilder { if ipNet.IP.To4() != nil { b.Match.CtIpDa = &ipNet.IP b.Match.CtIpDaMask = maskToIP(ipNet.Mask) - b.matchers = append(b.matchers, fmt.Sprintf("ct_nw_dst=%s", ipNet.String())) } else { b.Match.CtIpv6Da = &ipNet.IP b.Match.CtIpv6DaMask = maskToIP(ipNet.Mask) - b.matchers = append(b.matchers, fmt.Sprintf("ct_ipv6_dst=%s", ipNet.String())) } return b } @@ -666,7 +545,6 @@ func (b *ofFlowBuilder) MatchCTDstIPNet(ipNet net.IPNet) FlowBuilder { // "+new", "+est", "+rel" and "+trk-inv". func (b *ofFlowBuilder) MatchCTSrcPort(port uint16) FlowBuilder { b.Match.CtTpSrcPort = port - b.matchers = append(b.matchers, fmt.Sprintf("ct_tp_src=%d", port)) return b } @@ -675,7 +553,6 @@ func (b *ofFlowBuilder) MatchCTSrcPort(port uint16) FlowBuilder { // include "+new", "+est", "+rel" and "+trk-inv". func (b *ofFlowBuilder) MatchCTDstPort(port uint16) FlowBuilder { b.Match.CtTpDstPort = port - b.matchers = append(b.matchers, fmt.Sprintf("ct_tp_dst=%d", port)) return b } @@ -695,7 +572,6 @@ func (b *ofFlowBuilder) MatchCTProtocol(proto Protocol) FlowBuilder { case ProtocolICMPv6: b.Match.CtIpProto = 58 } - b.matchers = append(b.matchers, fmt.Sprintf("ct_nw_proto=%d", b.Match.CtIpProto)) return b } diff --git a/pkg/ovs/openflow/ofctrl_flow.go b/pkg/ovs/openflow/ofctrl_flow.go index 1d5e4c2f7cb..4245884896e 100644 --- a/pkg/ovs/openflow/ofctrl_flow.go +++ b/pkg/ovs/openflow/ofctrl_flow.go @@ -16,7 +16,6 @@ package openflow import ( "fmt" - "strings" "antrea.io/libOpenflow/openflow15" "antrea.io/ofnet/ofctrl" @@ -35,29 +34,30 @@ type ofFlow struct { // to Flow.Table should hold the replayMutex read lock. *ofctrl.Flow - // matchers is string slice, it is used to generate a readable match string of the Flow. - matchers []string // protocol adds a readable protocol type in the match string of ofFlow. protocol Protocol - // ctStateString is a temporary variable for the readable ct_state configuration. Its value is changed when the client - // updates the matching condition of "ct_states". When FlowBuilder.Done is called, its value is added into the matchers. - ctStateString string // ctStates is a temporary variable to maintain openflow15.CTStates. When FlowBuilder.Done is called, it is used to // set the CtStates field in ofctrl.Flow.Match. ctStates *openflow15.CTStates - // isDropFlow is true if this flow actions contain "drop" - isDropFlow bool } func (f *ofFlow) String() string { - s := strings.Join([]string{`&ofFlow{`, - `table:` + fmt.Sprintf("%v", f.table.id) + `,`, - `matchers:` + fmt.Sprintf("%v", f.matchers) + `,`, - `protocol:` + fmt.Sprintf("%v", f.protocol) + `,`, - `flow:` + fmt.Sprintf("%v", f.Flow) + `,`, - `}`, - }, "") - return s + flowMod, err := f.getFlowMod() + if err != nil { + return "" + } + return FlowModToString(flowMod) +} + +func (f *ofFlow) getFlowMod() (*openflow15.FlowMod, error) { + flowMods, err := f.GetBundleMessages(AddMessage) + if err != nil { + return nil, err + } + if len(flowMods) == 0 { + return nil, fmt.Errorf("no flowMod message is generated") + } + return flowMods[0].GetMessage().(*openflow15.FlowMod), nil } // Reset updates the ofFlow.Flow.Table field with ofFlow.table.Table. @@ -105,15 +105,11 @@ func (f *ofFlow) KeyString() string { } func (f *ofFlow) MatchString() string { - repr := fmt.Sprintf("table=%d", f.table.id) - if f.protocol != "" { - repr = fmt.Sprintf("%s,%s", repr, f.protocol) - } - - if len(f.matchers) > 0 { - repr = fmt.Sprintf("%s,%s", repr, strings.Join(f.matchers, ",")) + flowMod, err := f.getFlowMod() + if err != nil { + return "" } - return repr + return FlowModMatchString(flowMod) } func (f *ofFlow) FlowPriority() uint16 { @@ -162,23 +158,11 @@ func (f *ofFlow) CopyToBuilder(priority uint16, copyActions bool) FlowBuilder { newFlow := ofFlow{ table: f.table, Flow: flow, - matchers: f.matchers, protocol: f.protocol, } - if copyActions { - newFlow.isDropFlow = f.isDropFlow - } return &ofFlowBuilder{newFlow} } -func (f *ofFlow) IsDropFlow() bool { - return f.isDropFlow -} - -func (f *ofFlow) GetTable() Table { - return f.table -} - func (r *Range) ToNXRange() *openflow15.NXRange { return openflow15.NewNXRange(int(r[0]), int(r[1])) } diff --git a/pkg/ovs/openflow/ofctrl_flow_test.go b/pkg/ovs/openflow/ofctrl_flow_test.go index 8741ffdb4ac..e1162679169 100644 --- a/pkg/ovs/openflow/ofctrl_flow_test.go +++ b/pkg/ovs/openflow/ofctrl_flow_test.go @@ -17,6 +17,7 @@ package openflow import ( "testing" + "antrea.io/ofnet/ofctrl" "github.com/stretchr/testify/assert" ) @@ -41,6 +42,7 @@ func TestCopyToBuilder(t *testing.T) { LoadToCtMark(mark). MoveToLabel(NxmFieldSrcMAC, &Range{0, 47}, &Range{0, 47}).CTDone(). Done() + oriFlow.(*ofFlow).Flow.Table = &ofctrl.Table{TableId: t0} newFlow := oriFlow.CopyToBuilder(0, false) assert.Equal(t, oriFlow.MatchString(), newFlow.Done().MatchString()) assert.Equal(t, oriFlow.(*ofFlow).Match, newFlow.Done().(*ofFlow).Match) @@ -60,12 +62,11 @@ func TestCopyToBuilder_Drop(t *testing.T) { MatchCTStateNew(true).MatchCTStateTrk(true). Action().Drop(). Done() + oriFlow.(*ofFlow).Flow.Table = &ofctrl.Table{TableId: t0} newFlow := oriFlow.CopyToBuilder(0, false) assert.Equal(t, oriFlow.MatchString(), newFlow.Done().MatchString()) assert.Equal(t, oriFlow.(*ofFlow).Match, newFlow.Done().(*ofFlow).Match) - assert.Equal(t, false, newFlow.Done().IsDropFlow()) newPriority := uint16(200) newFlow2 := oriFlow.CopyToBuilder(newPriority, true) assert.Equal(t, newPriority, newFlow2.Done().(*ofFlow).Match.Priority) - assert.Equal(t, true, newFlow2.Done().IsDropFlow()) } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index deb06684359..84e22f3bde7 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -21,6 +21,7 @@ package testing import ( openflow "antrea.io/antrea/pkg/ovs/openflow" + openflow15 "antrea.io/libOpenflow/openflow15" protocol "antrea.io/libOpenflow/protocol" util "antrea.io/libOpenflow/util" ofctrl "antrea.io/ofnet/ofctrl" @@ -53,7 +54,7 @@ func (m *MockBridge) EXPECT() *MockBridgeMockRecorder { } // AddFlowsInBundle mocks base method -func (m *MockBridge) AddFlowsInBundle(arg0, arg1, arg2 []ofctrl.OpenFlowModMessage) error { +func (m *MockBridge) AddFlowsInBundle(arg0, arg1, arg2 []*openflow15.FlowMod) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddFlowsInBundle", arg0, arg1, arg2) ret0, _ := ret[0].(error) @@ -291,6 +292,21 @@ func (mr *MockBridgeMockRecorder) DumpTableStatus() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DumpTableStatus", reflect.TypeOf((*MockBridge)(nil).DumpTableStatus)) } +// GetTableByID mocks base method +func (m *MockBridge) GetTableByID(arg0 byte) (openflow.Table, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTableByID", arg0) + ret0, _ := ret[0].(openflow.Table) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTableByID indicates an expected call of GetTableByID +func (mr *MockBridgeMockRecorder) GetTableByID(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableByID", reflect.TypeOf((*MockBridge)(nil).GetTableByID), arg0) +} + // IsConnected mocks base method func (m *MockBridge) IsConnected() bool { m.ctrl.T.Helper() @@ -598,34 +614,6 @@ func (mr *MockFlowMockRecorder) GetBundleMessages(arg0 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBundleMessages", reflect.TypeOf((*MockFlow)(nil).GetBundleMessages), arg0) } -// GetTable mocks base method -func (m *MockFlow) GetTable() openflow.Table { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetTable") - ret0, _ := ret[0].(openflow.Table) - return ret0 -} - -// GetTable indicates an expected call of GetTable -func (mr *MockFlowMockRecorder) GetTable() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTable", reflect.TypeOf((*MockFlow)(nil).GetTable)) -} - -// IsDropFlow mocks base method -func (m *MockFlow) IsDropFlow() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsDropFlow") - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsDropFlow indicates an expected call of IsDropFlow -func (mr *MockFlowMockRecorder) IsDropFlow() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDropFlow", reflect.TypeOf((*MockFlow)(nil).IsDropFlow)) -} - // KeyString mocks base method func (m *MockFlow) KeyString() string { m.ctrl.T.Helper() diff --git a/pkg/ovs/openflow/testing/utils.go b/pkg/ovs/openflow/utils.go similarity index 98% rename from pkg/ovs/openflow/testing/utils.go rename to pkg/ovs/openflow/utils.go index 1abd8e0210b..e67ceb64f45 100644 --- a/pkg/ovs/openflow/testing/utils.go +++ b/pkg/ovs/openflow/utils.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package testing +package openflow import ( "fmt" @@ -391,6 +391,11 @@ func matchTunDstToString(field *openflow15.MatchField, isIPv6 bool) string { return fmt.Sprintf("%s=%s", matchKey, getFieldDataString(field)) } +func matchTunIDToString(field *openflow15.MatchField) string { + matchKey := "tun_id" + return fmt.Sprintf("%s=%s", matchKey, getFieldDataString(field)) +} + func matchNwTosToString(field *openflow15.MatchField) string { return fmt.Sprintf("nw_tos=%s", getFieldDataString(field)) } @@ -538,6 +543,8 @@ func getFieldDataString(field *openflow15.MatchField) string { } case *openflow15.TunnelIpv4DstField: fieldStr = value.(*openflow15.TunnelIpv4DstField).TunnelIpv4Dst.String() + case *openflow15.TunnelIdField: + fieldStr = fmt.Sprintf("%d", value.(*openflow15.TunnelIdField).TunnelId) case *openflow15.VlanIdField: fieldStr = fmt.Sprintf("%d", value.(*openflow15.VlanIdField).VlanId) case *openflow15.ArpOperField: @@ -957,6 +964,9 @@ func getFlowModMatch(flowMod *openflow15.FlowMod) string { if field, ok := matchMap["tun_ipv6_dst"]; ok { parts = append(parts, matchTunDstToString(field, true)) } + if field, ok := matchMap["tun_id"]; ok { + parts = append(parts, matchTunIDToString(field)) + } // TODO: metadata @@ -1135,7 +1145,11 @@ func getFlowModAction(flowMod *openflow15.FlowMod) string { } func FlowModToString(flowMod *openflow15.FlowMod) string { - return fmt.Sprintf("%s, %s %s", getFlowModBaseString(flowMod), getFlowModMatch(flowMod), getFlowModAction(flowMod)) + return strings.Join([]string{fmt.Sprintf("%s,", getFlowModBaseString(flowMod)), getFlowModMatch(flowMod), getFlowModAction(flowMod)}, " ") +} + +func FlowModMatchString(flowMod *openflow15.FlowMod) string { + return fmt.Sprintf("table=%d,%s", flowMod.TableId, getFlowModMatch(flowMod)) } func GroupModToString(groupMod *openflow15.GroupMod) string { diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index 0c1f8b1bc33..ed0c5718d94 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "antrea.io/libOpenflow/openflow15" "antrea.io/ofnet/ofctrl" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -1325,11 +1326,11 @@ func getCookieIDMask() (uint64, uint64) { return cookieID, cookieMask } -func getFlowModificationMessages(flows []binding.Flow, operation binding.OFOperation) []ofctrl.OpenFlowModMessage { - messages := make([]ofctrl.OpenFlowModMessage, 0, len(flows)) +func getFlowModificationMessages(flows []binding.Flow, operation binding.OFOperation) []*openflow15.FlowMod { + messages := make([]*openflow15.FlowMod, 0, len(flows)) for _, f := range flows { msg, _ := f.GetBundleMessages(operation) - messages = append(messages, msg[0]) + messages = append(messages, msg[0].GetMessage().(*openflow15.FlowMod)) } return messages }