Skip to content

Commit

Permalink
Cache Flow modification message in Antrea Agent
Browse files Browse the repository at this point in the history
Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Mar 21, 2023
1 parent 699a5bf commit 350b7ea
Show file tree
Hide file tree
Showing 26 changed files with 1,129 additions and 862 deletions.
86 changes: 49 additions & 37 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
ofutil "antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -390,31 +391,32 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return c.addFlowsWithMultipleKeys(cache, map[string][]binding.Flow{flowCacheKey: flows})
}

// addFlowsWithMultipleKeys installs the flows with different flowCache keys and adds them into the cache on success.
// addFlowsWithMultipleKeys installs the flows with different flowMessageCache keys and adds them into the cache on success.
// It will skip flows whose cache already exists. All flows will be installed via a bundle.
func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error {
// allFlows keeps the flows we will install via a bundle.
var allFlows []binding.Flow
// flowCacheMap keeps the flowCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowCache{}
// allMessages keeps the OpenFlow modification messages we will install via a bundle.
var allMessages []ofctrl.OpenFlowModMessage
// flowCacheMap keeps the flowMessageCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowMessageCache{}
for flowCacheKey, flows := range keyToFlows {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, skip it.
if ok {
klog.V(2).InfoS("Flows with this cache key are already installed", "key", flowCacheKey)
continue
}
fCache := flowCache{}
fCache := flowMessageCache{}
for _, flow := range flows {
allFlows = append(allFlows, flow)
fCache[flow.MatchString()] = flow
cf := getCachedFlow(flow, binding.AddMessage)
allMessages = append(allMessages, cf.message)
fCache[flow.MatchString()] = cf
}
flowCacheMap[flowCacheKey] = fCache
}
if len(allFlows) == 0 {
if len(allMessages) == 0 {
return nil
}
err := c.ofEntryOperations.AddAll(allFlows)
err := c.ofEntryOperations.AddAll(allMessages)
if err != nil {
return err
}
Expand All @@ -428,29 +430,33 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
// modifyFlows sets the flows of flowCategoryCache be exactly same as the provided slice for the given flowCacheKey.
func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
oldFlowCacheI, ok := cache.Load(flowCacheKey)
fCache := flowCache{}
fCache := flowMessageCache{}
var err error
if !ok {
messages := make([]ofctrl.OpenFlowModMessage, 0)
for _, flow := range flows {
fCache[flow.MatchString()] = flow
cf := getCachedFlow(flow, binding.AddMessage)
messages = append(messages, cf.message)
fCache[flow.MatchString()] = cf
}

err = c.ofEntryOperations.AddAll(flows)
err = c.ofEntryOperations.AddAll(messages)
} else {
var adds, mods, dels []binding.Flow
oldFlowCache := oldFlowCacheI.(flowCache)
var adds, mods, dels []ofctrl.OpenFlowModMessage
oldFlowCache := oldFlowCacheI.(flowMessageCache)
for _, flow := range flows {
matchString := flow.MatchString()
var cachedFlowInfo *cachedFlow
if _, ok := oldFlowCache[matchString]; ok {
mods = append(mods, flow)
cachedFlowInfo = getCachedFlow(flow, binding.ModifyMessage)
} else {
adds = append(adds, flow)
cachedFlowInfo = getCachedFlow(flow, binding.AddMessage)
}
fCache[matchString] = flow
adds = append(adds, cachedFlowInfo.message)
fCache[matchString] = cachedFlowInfo
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
dels = append(dels, v)
dels = append(dels, v.message)
}
}
err = c.ofEntryOperations.BundleOps(adds, mods, dels)
Expand All @@ -473,23 +479,22 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro
// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle.
func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error {
// allFlows keeps the flows we will delete via a bundle.
var allFlows []binding.Flow
var allFlows []ofctrl.OpenFlowModMessage
for _, key := range keys {
flows, ok := cache.Load(key)
// If a flow cache entry of the key does not exist, skip it.
if !ok {
klog.V(2).InfoS("Cached flow with provided key was not found", "key", key)
continue
}
for _, flow := range flows.(flowCache) {
allFlows = append(allFlows, flow)
for _, flow := range flows.(flowMessageCache) {
allFlows = append(allFlows, flow.message)
}
}
if len(allFlows) == 0 {
return nil
}
err := c.ofEntryOperations.DeleteAll(allFlows)
if err != nil {
if err := c.ofEntryOperations.DeleteAll(allFlows); err != nil {
return err
}
// Delete the keys and corresponding flows from the flow cache.
Expand All @@ -500,16 +505,7 @@ func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []st
}

func (c *client) deleteAllFlows(cache *flowCategoryCache) error {
var delAllFlows []binding.Flow
cache.Range(func(key, value any) bool {
fCache := value.(flowCache)
delFlows := make([]binding.Flow, 0, len(fCache))
for _, flow := range fCache {
delFlows = append(delFlows, flow)
}
delAllFlows = append(delAllFlows, delFlows...)
return true
})
delAllFlows := getCachedFlowMessages(cache)
if delAllFlows != nil {
if err := c.ofEntryOperations.DeleteAll(delAllFlows); err != nil {
return err
Expand Down Expand Up @@ -653,7 +649,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
if !ok {
return nil
}
fCache := fCacheI.(flowCache)
fCache := fCacheI.(flowMessageCache)
flowKeys := make([]string, 0, len(fCache))

// ReplayFlows() could change Flow internal state. Although its current
Expand All @@ -662,7 +658,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, flow.MatchString())
flowKeys = append(flowKeys, flow.key)
}
return flowKeys
}
Expand Down Expand Up @@ -1535,3 +1531,19 @@ func (c *client) UninstallMulticlusterPodFlows(podIP string) error {
}
return c.deleteFlows(c.featureMulticluster.cachedPodFlows, podIP)
}

func getFlowMessages(flows []binding.Flow) []ofctrl.OpenFlowModMessage {
messages := make([]ofctrl.OpenFlowModMessage, 0, len(flows))
for i := range flows {
cf := getCachedFlow(flows[i], binding.AddMessage)
messages = append(messages, cf.message)
}
return messages
}

func getCachedFlow(flow binding.Flow, entryOper binding.OFOperation) *cachedFlow {
bundleMessages, _ := flow.GetBundleMessages(entryOper)
flowKey := flow.MatchString()
flowMessage := bundleMessages[0]
return &cachedFlow{key: flowKey, table: flow.GetTable(), isDropFlow: flow.IsDropFlow(), message: flowMessage}
}
Loading

0 comments on commit 350b7ea

Please sign in to comment.