Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Apr 4, 2023
1 parent 7cf875b commit ec409ec
Show file tree
Hide file tree
Showing 23 changed files with 383 additions and 634 deletions.
48 changes: 23 additions & 25 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
ofutil "antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -395,7 +394,7 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
// It will skip flows whose cache already exists. All flows will be installed via a bundle.
func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error {
// allMessages keeps the OpenFlow modification messages we will install via a bundle.
var allMessages []ofctrl.OpenFlowModMessage
var allMessages []*openflow15.FlowMod
// flowCacheMap keeps the flowMessageCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowMessageCache{}
for flowCacheKey, flows := range keyToFlows {
Expand All @@ -407,9 +406,9 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
}
fCache := flowMessageCache{}
for _, flow := range flows {
cf := getCachedFlow(flow, binding.AddMessage)
allMessages = append(allMessages, cf.message)
fCache[flow.MatchString()] = cf
cf := getFlowModMessage(flow, binding.AddMessage)
allMessages = append(allMessages, cf)
fCache[getFlowKey(cf)] = cf
}
flowCacheMap[flowCacheKey] = fCache
}
Expand All @@ -433,30 +432,30 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow
fCache := flowMessageCache{}
var err error
if !ok {
messages := make([]ofctrl.OpenFlowModMessage, 0)
messages := make([]*openflow15.FlowMod, 0)
for _, flow := range flows {
cf := getCachedFlow(flow, binding.AddMessage)
messages = append(messages, cf.message)
fCache[flow.MatchString()] = cf
cf := getFlowModMessage(flow, binding.AddMessage)
messages = append(messages, cf)
fCache[getFlowKey(cf)] = cf
}
err = c.ofEntryOperations.AddAll(messages)
} else {
var adds, mods, dels []ofctrl.OpenFlowModMessage
var adds, mods, dels []*openflow15.FlowMod
oldFlowCache := oldFlowCacheI.(flowMessageCache)
for _, flow := range flows {
matchString := flow.MatchString()
var cachedFlowInfo *cachedFlow
var cachedFlowInfo *openflow15.FlowMod
if _, ok := oldFlowCache[matchString]; ok {
cachedFlowInfo = getCachedFlow(flow, binding.ModifyMessage)
cachedFlowInfo = getFlowModMessage(flow, binding.ModifyMessage)
} else {
cachedFlowInfo = getCachedFlow(flow, binding.AddMessage)
cachedFlowInfo = getFlowModMessage(flow, binding.AddMessage)
}
adds = append(adds, cachedFlowInfo.message)
adds = append(adds, cachedFlowInfo)
fCache[matchString] = cachedFlowInfo
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
dels = append(dels, v.message)
dels = append(dels, v)
}
}
err = c.ofEntryOperations.BundleOps(adds, mods, dels)
Expand All @@ -479,7 +478,7 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle.
func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error {
// allFlows keeps the flows we will delete via a bundle.
var allFlows []ofctrl.OpenFlowModMessage
var allFlows []*openflow15.FlowMod
for _, key := range keys {
flows, ok := cache.Load(key)
// If a flow cache entry of the key does not exist, skip it.
Expand All @@ -488,7 +487,7 @@ func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []st
continue
}
for _, flow := range flows.(flowMessageCache) {
allFlows = append(allFlows, flow.message)
allFlows = append(allFlows, flow)
}
}
if len(allFlows) == 0 {
Expand Down Expand Up @@ -658,7 +657,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, flow.key)
flowKeys = append(flowKeys, getFlowKey(flow))
}
return flowKeys
}
Expand Down Expand Up @@ -1532,18 +1531,17 @@ func (c *client) UninstallMulticlusterPodFlows(podIP string) error {
return c.deleteFlows(c.featureMulticluster.cachedPodFlows, podIP)
}

func getFlowMessages(flows []binding.Flow) []ofctrl.OpenFlowModMessage {
messages := make([]ofctrl.OpenFlowModMessage, 0, len(flows))
func getFlowMessages(flows []binding.Flow) []*openflow15.FlowMod {
messages := make([]*openflow15.FlowMod, 0, len(flows))
for i := range flows {
cf := getCachedFlow(flows[i], binding.AddMessage)
messages = append(messages, cf.message)
msg := getFlowModMessage(flows[i], binding.AddMessage)
messages = append(messages, msg)
}
return messages
}

func getCachedFlow(flow binding.Flow, entryOper binding.OFOperation) *cachedFlow {
func getFlowModMessage(flow binding.Flow, entryOper binding.OFOperation) *openflow15.FlowMod {
bundleMessages, _ := flow.GetBundleMessages(entryOper)
flowKey := flow.MatchString()
flowMessage := bundleMessages[0]
return &cachedFlow{key: flowKey, table: flow.GetTable(), isDropFlow: flow.IsDropFlow(), message: flowMessage}
return flowMessage.GetMessage().(*openflow15.FlowMod)
}
90 changes: 60 additions & 30 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
client.ipProtocols = ipProtocols
client.generatePipelines()
client.realizePipelines()
ovsoftest.TableNameCache = getTableNameCache()
binding.TableNameCache = getTableNameCache()
client.bridge.(*binding.OFBridge).SetOFSwitch(ofctrl.NewSwitch(&util.MessageStream{}, GlobalVirtualMAC, nil, make(chan int), 32776))
client.bridge.(*binding.OFBridge).Initialize()
return client
Expand All @@ -476,27 +476,26 @@ func getTableNameCache() map[uint8]string {
}

func getFlowStrings(flows interface{}) []string {
getStrings := func(flowMessage ofctrl.OpenFlowModMessage) []string {
getStrings := func(message *openflow15.FlowMod) []string {
var strs []string
message, _ := flowMessage.GetMessage().(*openflow15.FlowMod)
f := ovsoftest.FlowModToString(message)
f := binding.FlowModToString(message)
strs = append(strs, f)
return strs
}
var flowStrings []string
switch v := flows.(type) {
case flowMessageCache:
for _, flow := range v {
flowStrings = append(flowStrings, getStrings(flow.message)...)
flowStrings = append(flowStrings, getStrings(flow)...)
}
case []binding.Flow:
for _, flow := range v {
messages, _ := flow.GetBundleMessages(binding.AddMessage)
for _, msg := range messages {
flowStrings = append(flowStrings, getStrings(msg)...)
flowStrings = append(flowStrings, getStrings(msg.GetMessage().(*openflow15.FlowMod))...)
}
}
case []ofctrl.OpenFlowModMessage:
case []*openflow15.FlowMod:
for _, flow := range v {
flowStrings = append(flowStrings, getStrings(flow)...)
}
Expand All @@ -505,10 +504,10 @@ func getFlowStrings(flows interface{}) []string {
}

func getGroupFromCache(groupCache binding.Group) string {
ovsoftest.TableNameCache = getTableNameCache()
binding.TableNameCache = getTableNameCache()

messages, _ := groupCache.GetBundleMessages(binding.AddMessage)
groupString := ovsoftest.GroupModToString(messages[0].GetMessage().(*openflow15.GroupMod))
groupString := binding.GroupModToString(messages[0].GetMessage().(*openflow15.GroupMod))
return groupString
}

Expand Down Expand Up @@ -838,11 +837,11 @@ func Test_client_GetPodFlowKeys(t *testing.T) {
assert.NoError(t, fc.InstallPodFlows(interfaceName, podInterfaceIPs, podMAC, uint32(11), 0, nil))
flowKeys := fc.GetPodFlowKeys(interfaceName)
expectedFlowKeys := []string{
"table=1,arp,in_port=11,arp_sha=00:00:10:10:00:11,arp_spa=10.10.0.11",
"table=3,in_port=11",
"table=4,ip,in_port=11,dl_src=00:00:10:10:00:11,nw_src=10.10.0.11",
"table=17,ip,reg0=0x200/0x200,nw_dst=10.10.0.11",
"table=22,dl_dst=00:00:10:10:00:11",
"table=1,priority=200,arp,in_port=11,arp_spa=10.10.0.11,arp_sha=00:00:10:10:00:11",
"table=3,priority=190,in_port=11",
"table=4,priority=200,ip,in_port=11,dl_src=00:00:10:10:00:11,nw_src=10.10.0.11",
"table=17,priority=200,ip,reg0=0x200/0x200,nw_dst=10.10.0.11",
"table=22,priority=200,dl_dst=00:00:10:10:00:11",
}
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}
Expand Down Expand Up @@ -1225,11 +1224,11 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints))
flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints)
expectedFlowKeys := []string{
"table=11,tcp,tp_dst=0x50,nw_dst=10.96.0.224,reg4=0x10000/0x70000",
"table=11,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=0x50",
"table=12,tcp,reg4=0x20050/0x7ffff,reg3=0xa0a000b",
"table=12,tcp,reg4=0x20050/0x7ffff,reg3=0xa0a000c",
"table=20,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12,ct_state=+new+trk",
"table=11,priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.224,tp_dst=80",
"table=11,priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.224,tp_dst=80",
"table=12,priority=200,tcp,reg3=0xa0a000b,reg4=0x20050/0x7ffff",
"table=12,priority=200,tcp,reg3=0xa0a000c,reg4=0x20050/0x7ffff",
"table=20,priority=190,ct_state=+new+trk,ip,nw_src=10.10.0.12,nw_dst=10.10.0.12",
}
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}
Expand Down Expand Up @@ -1481,10 +1480,10 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
_, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32")
flowMsg := &ofctrl.FlowBundleMessage{}
mFlow := &cachedFlow{isDropFlow: true, message: flowMsg, table: EgressDefaultTable.ofTable}
flows, _ := EgressDefaultTable.ofTable.BuildFlow(priority100).Action().Drop().Done().GetBundleMessages(binding.AddMessage)
flowMsg := flows[0].GetMessage().(*openflow15.FlowMod)
ctx := &conjMatchFlowContext{
dropFlow: mFlow,
dropFlow: flowMsg,
dropFlowEnableLogging: false,
conjunctiveMatch: &conjunctiveMatch{
tableID: 1,
Expand All @@ -1496,8 +1495,7 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
},
}}
fc.featureNetworkPolicy.globalConjMatchFlowCache["mockContext"] = ctx
EgressMetricTable.ofTable.SetTable()
fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []*cachedFlow{{isDropFlow: true, message: flowMsg, table: EgressMetricTable.ofTable}}})
fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []*openflow15.FlowMod{flowMsg}})
return fc
}

Expand Down Expand Up @@ -2266,7 +2264,7 @@ func Test_client_ReplayFlows(t *testing.T) {
addFlowInCache := func(cache *flowCategoryCache, cacheKey string, flows []binding.Flow) {
fCache := flowMessageCache{}
for _, flow := range flows {
cf := getCachedFlow(flow, binding.AddMessage)
cf := getFlowModMessage(flow, binding.AddMessage)
fCache[flow.MatchString()] = cf
}
cache.Store(cacheKey, fCache)
Expand Down Expand Up @@ -2302,14 +2300,14 @@ func Test_client_ReplayFlows(t *testing.T) {
priority200 = uint16(200)
conj := &policyRuleConjunction{
id: ruleID,
actionFlows: []*cachedFlow{getCachedFlow(fc.featureNetworkPolicy.conjunctionActionDenyFlow(ruleID, IngressRuleTable.ofTable, &priority200, DispositionDrop, false), binding.AddMessage)},
metricFlows: []*cachedFlow{getCachedFlow(fc.featureNetworkPolicy.denyRuleMetricFlow(ruleID, true, IngressMetricTable.GetID()), binding.AddMessage)},
actionFlows: []*openflow15.FlowMod{getFlowModMessage(fc.featureNetworkPolicy.conjunctionActionDenyFlow(ruleID, IngressRuleTable.ofTable, &priority200, DispositionDrop, false), binding.AddMessage)},
metricFlows: []*openflow15.FlowMod{getFlowModMessage(fc.featureNetworkPolicy.denyRuleMetricFlow(ruleID, true, IngressMetricTable.GetID()), binding.AddMessage)},
}
assert.NoError(t, fc.featureNetworkPolicy.policyCache.Add(conj))
mp := matchPair{matchKey: MatchDstOFPort, matchValue: int32(podOfPort)}
context := &conjMatchFlowContext{
flow: getCachedFlow(fc.featureNetworkPolicy.conjunctiveMatchFlow(IngressRuleTable.GetID(), []matchPair{mp}, &priority200, []*conjunctiveAction{{conjID: ruleID, clauseID: 2, nClause: 2}}), binding.AddMessage),
dropFlow: getCachedFlow(fc.featureNetworkPolicy.defaultDropFlow(IngressDefaultTable.ofTable, []matchPair{mp}, true), binding.AddMessage),
flow: getFlowModMessage(fc.featureNetworkPolicy.conjunctiveMatchFlow(IngressRuleTable.GetID(), []matchPair{mp}, &priority200, []*conjunctiveAction{{conjID: ruleID, clauseID: 2, nClause: 2}}), binding.AddMessage),
dropFlow: getFlowModMessage(fc.featureNetworkPolicy.defaultDropFlow(IngressDefaultTable.ofTable, []matchPair{mp}, true), binding.AddMessage),
}
fc.featureNetworkPolicy.globalConjMatchFlowCache["npMatch"] = context
replayedFlows = append(replayedFlows,
Expand Down Expand Up @@ -2346,11 +2344,43 @@ func Test_client_ReplayFlows(t *testing.T) {
expectedFlows = append(expectedFlows, replayedFlows...)

actualFlows := make([]string, 0)
m.EXPECT().AddAll(gomock.Any()).Do(func(flowMessages []ofctrl.OpenFlowModMessage) {
m.EXPECT().AddAll(gomock.Any()).Do(func(flowMessages []*openflow15.FlowMod) {
flowStrings := getFlowStrings(flowMessages)
actualFlows = append(actualFlows, flowStrings...)
}).Return(nil).AnyTimes()

fc.ReplayFlows()
assert.ElementsMatch(t, expectedFlows, actualFlows)
}

func TestCachedFlowIsDrop(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)
fc := newFakeClient(m, true, false, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()
fc.bridge = ovsoftest.NewMockBridge(ctrl)

_, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32")
flows, err := EgressDefaultTable.ofTable.
BuildFlow(priority100).
MatchDstIPNet(*ipCIDR).
Action().Drop().
Done().
GetBundleMessages(binding.AddMessage)
assert.NoError(t, err)
require.Equal(t, 1, len(flows))
msg := flows[0].GetMessage().(*openflow15.FlowMod)
assert.True(t, isDropFlow(msg))

flows, err = EgressDefaultTable.ofTable.
BuildFlow(priority100).
MatchDstIPNet(*ipCIDR).
Action().GotoTable(1).
Done().
GetBundleMessages(binding.AddMessage)
assert.NoError(t, err)
require.Equal(t, 1, len(flows))
msg = flows[0].GetMessage().(*openflow15.FlowMod)
assert.False(t, isDropFlow(msg))
}
6 changes: 3 additions & 3 deletions pkg/agent/openflow/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package openflow
import (
"net"

"antrea.io/ofnet/ofctrl"
"antrea.io/libOpenflow/openflow15"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
Expand Down Expand Up @@ -73,12 +73,12 @@ func newFeatureEgress(cookieAllocator cookie.Allocator,
}
}

func (f *featureEgress) initFlows() []ofctrl.OpenFlowModMessage {
func (f *featureEgress) initFlows() []*openflow15.FlowMod {
// This installs the flows to enable Pods to communicate to the external IP addresses. The flows identify the packets
// from local Pods to the external IP address, and mark the packets to be SNAT'd with the configured SNAT IPs.
return getFlowMessages(f.externalFlows())
}

func (f *featureEgress) replayFlows() []ofctrl.OpenFlowModMessage {
func (f *featureEgress) replayFlows() []*openflow15.FlowMod {
return getCachedFlowMessages(f.cachedFlows)
}
13 changes: 6 additions & 7 deletions pkg/agent/openflow/externalnode_connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
package openflow

import (
"antrea.io/libOpenflow/openflow15"
"net"

"antrea.io/ofnet/ofctrl"

"antrea.io/antrea/pkg/agent/openflow/cookie"
binding "antrea.io/antrea/pkg/ovs/openflow"
)
Expand Down Expand Up @@ -101,7 +100,7 @@ func (f *featureExternalNodeConnectivity) vmUplinkFlows(hostOFPort, uplinkOFPort
}
}

func (f *featureExternalNodeConnectivity) initFlows() []ofctrl.OpenFlowModMessage {
func (f *featureExternalNodeConnectivity) initFlows() []*openflow15.FlowMod {
cookieID := f.cookieAllocator.Request(f.category).Raw()
flows := []binding.Flow{
L2ForwardingOutTable.ofTable.BuildFlow(priorityNormal).
Expand Down Expand Up @@ -140,12 +139,12 @@ func (f *featureExternalNodeConnectivity) initFlows() []ofctrl.OpenFlowModMessag
return getFlowMessages(flows)
}

func (f *featureExternalNodeConnectivity) replayFlows() []ofctrl.OpenFlowModMessage {
var flows []ofctrl.OpenFlowModMessage
func (f *featureExternalNodeConnectivity) replayFlows() []*openflow15.FlowMod {
var flows []*openflow15.FlowMod
rangeFunc := func(key, value interface{}) bool {
cachedFlows := value.([]cachedFlow)
cachedFlows := value.([]*openflow15.FlowMod)
for _, flow := range cachedFlows {
flows = append(flows, flow.message)
flows = append(flows, flow)
}
return true
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/agent/openflow/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
package openflow

import (
"antrea.io/ofnet/ofctrl"

"antrea.io/antrea/pkg/agent/config"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/libOpenflow/openflow15"
)

// OVS pipelines are generated by a framework called FlexiblePipeline. There are some abstractions introduced in this
Expand Down Expand Up @@ -58,9 +57,9 @@ type feature interface {
// getRequiredTables returns a slice of required tables of the feature.
getRequiredTables() []*Table
// initFlows returns the Openflow messages of initial flows of the feature.
initFlows() []ofctrl.OpenFlowModMessage
initFlows() []*openflow15.FlowMod
// replayFlows returns the Openflow messages of fixed and cached flows that need to be replayed after OVS is reconnected.
replayFlows() []ofctrl.OpenFlowModMessage
replayFlows() []*openflow15.FlowMod
}

const (
Expand Down
Loading

0 comments on commit ec409ec

Please sign in to comment.