Skip to content

Commit

Permalink
Cache Flow modification message in Antrea Agent
Browse files Browse the repository at this point in the history
Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Feb 1, 2023
1 parent b74f9fe commit f69a41f
Show file tree
Hide file tree
Showing 28 changed files with 1,103 additions and 853 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module antrea.io/antrea
go 1.19

require (
antrea.io/libOpenflow v0.9.1
antrea.io/libOpenflow v0.9.2
antrea.io/ofnet v0.6.5
github.com/ClickHouse/clickhouse-go v1.5.4
github.com/DATA-DOG/go-sqlmock v1.5.0
Expand Down Expand Up @@ -210,5 +210,3 @@ require (
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)

replace antrea.io/ofnet v0.6.0 => github.com/wenyingd/ofnet v0.0.0-20220817031400-cb451467adc1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
antrea.io/libOpenflow v0.8.1/go.mod h1:CzEJZxDNAupiGxeL5VOw92PsxfyvehEAvE3PiC6gr8o=
antrea.io/libOpenflow v0.9.1 h1:nrw7EpGuSgi932xriAHdMYGYdLnnjrj91qXGt/bzUUw=
antrea.io/libOpenflow v0.9.1/go.mod h1:IM9mUfHh5hUNciRRcWYIaWZTlv1TI6QBEHlml7ALdS4=
antrea.io/libOpenflow v0.9.2 h1:9W++nzaxxwY4NxyHHow/4bfum2UPIBJKmEOVTAG+x3o=
antrea.io/libOpenflow v0.9.2/go.mod h1:IM9mUfHh5hUNciRRcWYIaWZTlv1TI6QBEHlml7ALdS4=
antrea.io/ofnet v0.6.5 h1:jMnrU2Iva+jn/j2tyHJ1bSmC7HXtMDYVCJb7pq8L37I=
antrea.io/ofnet v0.6.5/go.mod h1:/gjpTqhUpyn8uZnef+ytdCCAeY5oGG1jCr/szPUqVXU=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
Expand Down
76 changes: 49 additions & 27 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
ofutil "antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -375,31 +376,32 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return c.addFlowsWithMultipleKeys(cache, map[string][]binding.Flow{flowCacheKey: flows})
}

// addFlowsWithMultipleKeys installs the flows with different flowCache keys and adds them into the cache on success.
// addFlowsWithMultipleKeys installs the flows with different flowMessageCache keys and adds them into the cache on success.
// It will skip flows whose cache already exists. All flows will be installed via a bundle.
func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error {
// allFlows keeps the flows we will install via a bundle.
var allFlows []binding.Flow
// flowCacheMap keeps the flowCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowCache{}
// allMessages keeps the OpenFlow modification messages we will install via a bundle.
var allMessages []ofctrl.OpenFlowModMessage
// flowCacheMap keeps the flowMessageCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowMessageCache{}
for flowCacheKey, flows := range keyToFlows {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, skip it.
if ok {
klog.V(2).InfoS("Flows with this cache key are already installed", "key", flowCacheKey)
continue
}
fCache := flowCache{}
fCache := flowMessageCache{}
for _, flow := range flows {
allFlows = append(allFlows, flow)
fCache[flow.MatchString()] = flow
cf := getCachedFlow(flow, binding.AddMessage)
allMessages = append(allMessages, cf.message)
fCache[flow.MatchString()] = cf
}
flowCacheMap[flowCacheKey] = fCache
}
if len(allFlows) == 0 {
if len(allMessages) == 0 {
return nil
}
err := c.ofEntryOperations.AddAll(allFlows)
err := c.ofEntryOperations.AddAll(allMessages)
if err != nil {
return err
}
Expand All @@ -413,29 +415,33 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
// modifyFlows sets the flows of flowCategoryCache be exactly same as the provided slice for the given flowCacheKey.
func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
oldFlowCacheI, ok := cache.Load(flowCacheKey)
fCache := flowCache{}
fCache := flowMessageCache{}
var err error
if !ok {
messages := make([]ofctrl.OpenFlowModMessage, 0)
for _, flow := range flows {
fCache[flow.MatchString()] = flow
cf := getCachedFlow(flow, binding.AddMessage)
messages = append(messages, cf.message)
fCache[flow.MatchString()] = cf
}

err = c.ofEntryOperations.AddAll(flows)
err = c.ofEntryOperations.AddAll(messages)
} else {
var adds, mods, dels []binding.Flow
oldFlowCache := oldFlowCacheI.(flowCache)
var adds, mods, dels []ofctrl.OpenFlowModMessage
oldFlowCache := oldFlowCacheI.(flowMessageCache)
for _, flow := range flows {
matchString := flow.MatchString()
var cachedFlowInfo *cachedFlow
if _, ok := oldFlowCache[matchString]; ok {
mods = append(mods, flow)
cachedFlowInfo = getCachedFlow(flow, binding.ModifyMessage)
} else {
adds = append(adds, flow)
cachedFlowInfo = getCachedFlow(flow, binding.AddMessage)
}
fCache[matchString] = flow
adds = append(adds, cachedFlowInfo.message)
fCache[matchString] = cachedFlowInfo
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
dels = append(dels, v)
dels = append(dels, v.message)
}
}
err = c.ofEntryOperations.BundleOps(adds, mods, dels)
Expand All @@ -456,13 +462,13 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
// no matching flows found in the cache
return nil
}
fCache := fCacheI.(flowCache)
fCache := fCacheI.(flowMessageCache)
// Delete flows from OVS.
delFlows := make([]binding.Flow, 0, len(fCache))
for _, flow := range fCache {
delFlows = append(delFlows, flow)
delFlowMessages := make([]ofctrl.OpenFlowModMessage, 0, len(fCache))
for _, flowInfo := range fCache {
delFlowMessages = append(delFlowMessages, flowInfo.message)
}
if err := c.ofEntryOperations.DeleteAll(delFlows); err != nil {
if err := c.ofEntryOperations.DeleteAll(delFlowMessages); err != nil {
return err
}
cache.Delete(flowCacheKey)
Expand Down Expand Up @@ -600,7 +606,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
if !ok {
return nil
}
fCache := fCacheI.(flowCache)
fCache := fCacheI.(flowMessageCache)
flowKeys := make([]string, 0, len(fCache))

// ReplayFlows() could change Flow internal state. Although its current
Expand All @@ -609,7 +615,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, flow.MatchString())
flowKeys = append(flowKeys, flow.key)
}
return flowKeys
}
Expand Down Expand Up @@ -1417,3 +1423,19 @@ func (c *client) UninstallMulticlusterFlows(clusterID string) error {
cacheKey := fmt.Sprintf("cluster_%s", clusterID)
return c.deleteFlows(c.featureMulticluster.cachedFlows, cacheKey)
}

func getFlowMessages(flows []binding.Flow) []ofctrl.OpenFlowModMessage {
messages := make([]ofctrl.OpenFlowModMessage, 0, len(flows))
for i := range flows {
cf := getCachedFlow(flows[i], binding.AddMessage)
messages = append(messages, cf.message)
}
return messages
}

func getCachedFlow(flow binding.Flow, entryOper binding.OFOperation) *cachedFlow {
bundleMessages, _ := flow.GetBundleMessages(entryOper)
flowKey := flow.MatchString()
flowMessage := bundleMessages[0]
return &cachedFlow{key: flowKey, table: flow.GetTable(), isDropFlow: flow.IsDropFlow(), message: flowMessage}
}
Loading

0 comments on commit f69a41f

Please sign in to comment.