Skip to content

Commit

Permalink
Cache Flow modification message in Antrea Agent
Browse files Browse the repository at this point in the history
Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Dec 19, 2022
1 parent 2f2d4d2 commit 0933bdb
Show file tree
Hide file tree
Showing 21 changed files with 461 additions and 248 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module antrea.io/antrea
go 1.19

require (
antrea.io/libOpenflow v0.9.1
antrea.io/libOpenflow v0.9.2
antrea.io/ofnet v0.6.5
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/DATA-DOG/go-sqlmock v1.5.0
Expand Down Expand Up @@ -210,5 +210,3 @@ require (
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace antrea.io/ofnet v0.6.0 => github.com/wenyingd/ofnet v0.0.0-20220817031400-cb451467adc1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
antrea.io/libOpenflow v0.8.1/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/libOpenflow v0.9.1 h1:nrw7EpGuSgi932xriAHdMYGYdLnnjrj91qXGt/bzUUw=
antrea.io/libOpenflow v0.9.1/go.mod h1:IM9mUfHh5hUNciRRcWYIaWZTlv1TI6QBEHlml7ALdS4=
antrea.io/libOpenflow v0.9.2 h1:9W++nzaxxwY4NxyHHow/4bfum2UPIBJKmEOVTAG+x3o=
antrea.io/libOpenflow v0.9.2/go.mod h1:IM9mUfHh5hUNciRRcWYIaWZTlv1TI6QBEHlml7ALdS4=
antrea.io/ofnet v0.6.5 h1:jMnrU2Iva+jn/j2tyHJ1bSmC7HXtMDYVCJb7pq8L37I=
antrea.io/ofnet v0.6.5/go.mod h1:/gjpTqhUpyn8uZnef+ytdCCAeY5oGG1jCr/szPUqVXU=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down
62 changes: 42 additions & 20 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
ofutil "antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -375,8 +376,8 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
// addFlowsWithMultipleKeys installs the flows with different flowCache keys and adds them into the cache on success.
// It will skip flows whose cache already exists. All flows will be installed via a bundle.
func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error {
// allFlows keeps the flows we will install via a bundle.
var allFlows []binding.Flow
// allMessages keeps the OpenFlow modification messages we will install via a bundle.
var allMessages []ofctrl.OpenFlowModMessage
// flowCacheMap keeps the flowCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowCache{}
for flowCacheKey, flows := range keyToFlows {
Expand All @@ -388,15 +389,16 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
}
fCache := flowCache{}
for _, flow := range flows {
allFlows = append(allFlows, flow)
fCache[flow.MatchString()] = flow
cf := getCachedFlow(flow, binding.AddMessage)
allMessages = append(allMessages, cf.message)
fCache[flow.MatchString()] = cf
}
flowCacheMap[flowCacheKey] = fCache
}
if len(allFlows) == 0 {
if len(allMessages) == 0 {
return nil
}
err := c.ofEntryOperations.AddAll(allFlows)
err := c.ofEntryOperations.AddAll(allMessages)
if err != nil {
return err
}
Expand All @@ -413,26 +415,30 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow
fCache := flowCache{}
var err error
if !ok {
messages := make([]ofctrl.OpenFlowModMessage, 0)
for _, flow := range flows {
fCache[flow.MatchString()] = flow
cf := getCachedFlow(flow, binding.AddMessage)
messages = append(messages, cf.message)
fCache[flow.MatchString()] = cf
}

err = c.ofEntryOperations.AddAll(flows)
err = c.ofEntryOperations.AddAll(messages)
} else {
var adds, mods, dels []binding.Flow
var adds, mods, dels []ofctrl.OpenFlowModMessage
oldFlowCache := oldFlowCacheI.(flowCache)
for _, flow := range flows {
matchString := flow.MatchString()
var cachedFlowInfo *cachedFlow
if _, ok := oldFlowCache[matchString]; ok {
mods = append(mods, flow)
cachedFlowInfo = getCachedFlow(flow, binding.ModifyMessage)
} else {
adds = append(adds, flow)
cachedFlowInfo = getCachedFlow(flow, binding.AddMessage)
}
fCache[matchString] = flow
adds = append(adds, cachedFlowInfo.message)
fCache[matchString] = cachedFlowInfo
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
dels = append(dels, v)
dels = append(dels, v.message)
}
}
err = c.ofEntryOperations.BundleOps(adds, mods, dels)
Expand All @@ -455,11 +461,11 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
}
fCache := fCacheI.(flowCache)
// Delete flows from OVS.
delFlows := make([]binding.Flow, 0, len(fCache))
for _, flow := range fCache {
delFlows = append(delFlows, flow)
delFlowMessages := make([]ofctrl.OpenFlowModMessage, 0, len(fCache))
for _, flowInfo := range fCache {
delFlowMessages = append(delFlowMessages, flowInfo.message)
}
if err := c.ofEntryOperations.DeleteAll(delFlows); err != nil {
if err := c.ofEntryOperations.DeleteAll(delFlowMessages); err != nil {
return err
}
cache.Delete(flowCacheKey)
Expand Down Expand Up @@ -606,7 +612,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, flow.MatchString())
flowKeys = append(flowKeys, flow.key)
}
return flowKeys
}
Expand Down Expand Up @@ -954,7 +960,7 @@ func (c *client) ReplayFlows() {
}

for _, activeFeature := range c.activatedFeatures {
if err := c.ofEntryOperations.AddAll(activeFeature.replayFlows()); err != nil {
if err := c.AddAll(activeFeature.replayFlows()); err != nil {
klog.ErrorS(err, "Error when replaying feature flows", "feature", activeFeature.getFeatureName())
}
}
Expand Down Expand Up @@ -1408,3 +1414,19 @@ func (c *client) UninstallMulticlusterFlows(clusterID string) error {
cacheKey := fmt.Sprintf("cluster_%s", clusterID)
return c.deleteFlows(c.featureMulticluster.cachedFlows, cacheKey)
}

func getFlowMessages(flows []binding.Flow) []ofctrl.OpenFlowModMessage {
messages := make([]ofctrl.OpenFlowModMessage, 0, len(flows))
for i := range flows {
cf := getCachedFlow(flows[i], binding.AddMessage)
messages = append(messages, cf.message)
}
return messages
}

func getCachedFlow(flow binding.Flow, entryOper binding.OFOperation) *cachedFlow {
bundleMessages, _ := flow.GetBundleMessages(entryOper)
flowKey := flow.MatchString()
flowMessage := bundleMessages[0]
return &cachedFlow{key: flowKey, table: flow.GetTable(), isDropFlow: flow.IsDropFlow(), message: flowMessage}
}
41 changes: 29 additions & 12 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,22 +435,27 @@ func getTableNameCache() map[uint8]string {
}

func getFlowStrings(flows interface{}) []string {
getStrings := func(flow binding.Flow) []string {
getStrings := func(flowMessage ofctrl.OpenFlowModMessage) []string {
var strs []string
messages, _ := flow.GetBundleMessages(binding.AddMessage)
for _, message := range messages {
f := ovsoftest.FlowModToString(message.GetMessage().(*openflow15.FlowMod))
strs = append(strs, f)
}
message, _ := flowMessage.GetMessage().(*openflow15.FlowMod)
f := ovsoftest.FlowModToString(message)
strs = append(strs, f)
return strs
}
var flowStrings []string
switch v := flows.(type) {
case flowCache:
for _, flow := range v {
flowStrings = append(flowStrings, getStrings(flow)...)
flowStrings = append(flowStrings, getStrings(flow.message)...)
}
case []binding.Flow:
for _, flow := range v {
messages, _ := flow.GetBundleMessages(binding.AddMessage)
for _, msg := range messages {
flowStrings = append(flowStrings, getStrings(msg)...)
}
}
case []ofctrl.OpenFlowModMessage:
for _, flow := range v {
flowStrings = append(flowStrings, getStrings(flow)...)
}
Expand Down Expand Up @@ -1397,12 +1402,24 @@ func prepareTraceflowFlow(ctrl *gomock.Controller) *client {
fc.bridge = ovsoftest.NewMockBridge(ctrl)

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
mFlow := ovsoftest.NewMockFlow(ctrl)
ctx := &conjMatchFlowContext{dropFlow: mFlow, dropFlowEnableLogging: false}
mFlow.EXPECT().FlowProtocol().Return(binding.Protocol("ip"))
mFlow.EXPECT().CopyToBuilder(priorityNormal+2, false).Return(EgressDefaultTable.ofTable.BuildFlow(priorityNormal + 2)).Times(1)
_, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32")
flowMsg := &ofctrl.FlowBundleMessage{}
mFlow := &cachedFlow{isDropFlow: true, message: flowMsg, table: EgressDefaultTable.ofTable}
ctx := &conjMatchFlowContext{
dropFlow: mFlow,
dropFlowEnableLogging: false,
conjunctiveMatch: &conjunctiveMatch{
tableID: 1,
matchPairs: []matchPair{
{
matchKey: MatchCTSrcIPNet,
matchValue: *ipCIDR,
},
},
}}
fc.featureNetworkPolicy.globalConjMatchFlowCache["mockContext"] = ctx
fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []binding.Flow{fc.featureNetworkPolicy.denyRuleMetricFlow(123, false, 1)}})
EgressMetricTable.ofTable.SetTable()
fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []*cachedFlow{{isDropFlow: true, message: flowMsg, table: EgressMetricTable.ofTable}}})
return fc
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/agent/openflow/egress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package openflow
import (
"net"

"antrea.io/ofnet/ofctrl"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
binding "antrea.io/antrea/pkg/ovs/openflow"
Expand Down Expand Up @@ -71,16 +73,16 @@ func newFeatureEgress(cookieAllocator cookie.Allocator,
}
}

func (f *featureEgress) initFlows() []binding.Flow {
func (f *featureEgress) initFlows() []ofctrl.OpenFlowModMessage {
// This installs the flows to enable Pods to communicate to the external IP addresses. The flows identify the packets
// from local Pods to the external IP address, and mark the packets to be SNAT'd with the configured SNAT IPs.
return f.externalFlows()
return getFlowMessages(f.externalFlows())
}

func (f *featureEgress) replayFlows() []binding.Flow {
var flows []binding.Flow
func (f *featureEgress) replayFlows() []ofctrl.OpenFlowModMessage {
var flowMessages []ofctrl.OpenFlowModMessage
// Get cached flows.
flows = append(flows, getCachedFlows(f.cachedFlows)...)
flowMessages = append(flowMessages, getCachedFlowMessages(f.cachedFlows)...)

return flows
return flowMessages
}
15 changes: 8 additions & 7 deletions pkg/agent/openflow/externalnode_connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package openflow
import (
"net"

"antrea.io/ofnet/ofctrl"

"antrea.io/antrea/pkg/agent/openflow/cookie"
binding "antrea.io/antrea/pkg/ovs/openflow"
)
Expand Down Expand Up @@ -99,7 +101,7 @@ func (f *featureExternalNodeConnectivity) vmUplinkFlows(hostOFPort, uplinkOFPort
}
}

func (f *featureExternalNodeConnectivity) initFlows() []binding.Flow {
func (f *featureExternalNodeConnectivity) initFlows() []ofctrl.OpenFlowModMessage {
cookieID := f.cookieAllocator.Request(f.category).Raw()
flows := []binding.Flow{
L2ForwardingOutTable.ofTable.BuildFlow(priorityNormal).
Expand Down Expand Up @@ -135,16 +137,15 @@ func (f *featureExternalNodeConnectivity) initFlows() []binding.Flow {
)
}

return flows
return getFlowMessages(flows)
}

func (f *featureExternalNodeConnectivity) replayFlows() []binding.Flow {
var flows []binding.Flow
func (f *featureExternalNodeConnectivity) replayFlows() []ofctrl.OpenFlowModMessage {
var flows []ofctrl.OpenFlowModMessage
rangeFunc := func(key, value interface{}) bool {
cachedFlows := value.([]binding.Flow)
cachedFlows := value.([]cachedFlow)
for _, flow := range cachedFlows {
flow.Reset()
flows = append(flows, flow)
flows = append(flows, flow.message)
}
return true
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/agent/openflow/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package openflow
import (
"antrea.io/antrea/pkg/agent/config"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/ofnet/ofctrl"
)

// OVS pipelines are generated by a framework called FlexiblePipeline. There are some abstractions introduced in this
Expand Down Expand Up @@ -56,9 +57,9 @@ type feature interface {
// getRequiredTables returns a slice of required tables of the feature.
getRequiredTables() []*Table
// initFlows returns the initial flows of the feature.
initFlows() []binding.Flow
initFlows() []ofctrl.OpenFlowModMessage
// replayFlows returns the fixed and cached flows that need to be replayed after OVS is reconnected.
replayFlows() []binding.Flow
replayFlows() []ofctrl.OpenFlowModMessage
}

const (
Expand Down
9 changes: 5 additions & 4 deletions pkg/agent/openflow/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

"antrea.io/libOpenflow/openflow15"
"antrea.io/ofnet/ofctrl"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
Expand Down Expand Up @@ -71,7 +72,7 @@ func multicastPipelineClassifyFlow(cookieID uint64, pipeline binding.Pipeline) b
Done()
}

func (f *featureMulticast) initFlows() []binding.Flow {
func (f *featureMulticast) initFlows() []ofctrl.OpenFlowModMessage {
// Install flows to send the IGMP report messages to Antrea Agent.
flows := f.igmpPktInFlows(uint8(PacketInReasonMC))
// Install flow to forward the IGMP query messages to all local Pods.
Expand All @@ -84,12 +85,12 @@ func (f *featureMulticast) initFlows() []binding.Flow {
}
// Install flows to output multicast packets.
flows = append(flows, f.multicastOutputFlows()...)
return flows
return getFlowMessages(flows)
}

func (f *featureMulticast) replayFlows() []binding.Flow {
func (f *featureMulticast) replayFlows() []ofctrl.OpenFlowModMessage {
// Get cached flows.
return getCachedFlows(f.cachedFlows)
return getCachedFlowMessages(f.cachedFlows)
}

func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) binding.Group {
Expand Down
10 changes: 6 additions & 4 deletions pkg/agent/openflow/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package openflow
import (
"net"

"antrea.io/ofnet/ofctrl"

"antrea.io/antrea/pkg/agent/openflow/cookie"
binding "antrea.io/antrea/pkg/ovs/openflow"
)
Expand Down Expand Up @@ -53,12 +55,12 @@ func newFeatureMulticluster(cookieAllocator cookie.Allocator, ipProtocols []bind
}
}

func (f *featureMulticluster) initFlows() []binding.Flow {
return []binding.Flow{}
func (f *featureMulticluster) initFlows() []ofctrl.OpenFlowModMessage {
return []ofctrl.OpenFlowModMessage{}
}

func (f *featureMulticluster) replayFlows() []binding.Flow {
return getCachedFlows(f.cachedFlows)
func (f *featureMulticluster) replayFlows() []ofctrl.OpenFlowModMessage {
return getCachedFlowMessages(f.cachedFlows)
}

func (f *featureMulticluster) l3FwdFlowToRemoteViaTun(
Expand Down
Loading

0 comments on commit 0933bdb

Please sign in to comment.