Skip to content

Commit

Permalink
Cache Flow modification message in Antrea Agent
Browse files Browse the repository at this point in the history
Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed May 16, 2023
1 parent 8dbb48c commit 85a6be6
Show file tree
Hide file tree
Showing 32 changed files with 1,684 additions and 1,321 deletions.
87 changes: 49 additions & 38 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,31 +388,32 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return c.addFlowsWithMultipleKeys(cache, map[string][]binding.Flow{flowCacheKey: flows})
}

// addFlowsWithMultipleKeys installs the flows with different flowCache keys and adds them into the cache on success.
// addFlowsWithMultipleKeys installs the flows with different flowMessageCache keys and adds them into the cache on success.
// It will skip flows whose cache already exists. All flows will be installed via a bundle.
func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error {
// allFlows keeps the flows we will install via a bundle.
var allFlows []binding.Flow
// flowCacheMap keeps the flowCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowCache{}
// allMessages keeps the OpenFlow modification messages we will install via a bundle.
var allMessages []*openflow15.FlowMod
// flowCacheMap keeps the flowMessageCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowMessageCache{}
for flowCacheKey, flows := range keyToFlows {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, skip it.
if ok {
klog.V(2).InfoS("Flows with this cache key are already installed", "key", flowCacheKey)
continue
}
fCache := flowCache{}
fCache := flowMessageCache{}
for _, flow := range flows {
allFlows = append(allFlows, flow)
fCache[flow.MatchString()] = flow
msg := getFlowModMessage(flow, binding.AddMessage)
allMessages = append(allMessages, msg)
fCache[getFlowKey(msg)] = msg
}
flowCacheMap[flowCacheKey] = fCache
}
if len(allFlows) == 0 {
if len(allMessages) == 0 {
return nil
}
err := c.ofEntryOperations.AddAll(allFlows)
err := c.ofEntryOperations.AddAll(allMessages)
if err != nil {
return err
}
Expand All @@ -426,25 +427,30 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
// modifyFlows sets the flows of flowCategoryCache be exactly same as the provided slice for the given flowCacheKey.
func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
oldFlowCacheI, ok := cache.Load(flowCacheKey)
fCache := flowCache{}
fCache := flowMessageCache{}
var err error
if !ok {
messages := make([]*openflow15.FlowMod, 0, len(flows))
for _, flow := range flows {
fCache[flow.MatchString()] = flow
msg := getFlowModMessage(flow, binding.AddMessage)
messages = append(messages, msg)
fCache[getFlowKey(msg)] = msg
}

err = c.ofEntryOperations.AddAll(flows)
err = c.ofEntryOperations.AddAll(messages)
} else {
var adds, mods, dels []binding.Flow
oldFlowCache := oldFlowCacheI.(flowCache)
var adds, mods, dels []*openflow15.FlowMod
oldFlowCache := oldFlowCacheI.(flowMessageCache)
for _, flow := range flows {
matchString := flow.MatchString()
var cachedFlowInfo *openflow15.FlowMod
if _, ok := oldFlowCache[matchString]; ok {
mods = append(mods, flow)
cachedFlowInfo = getFlowModMessage(flow, binding.ModifyMessage)
adds = append(adds, cachedFlowInfo)
} else {
adds = append(adds, flow)
cachedFlowInfo = getFlowModMessage(flow, binding.AddMessage)
mods = append(mods, cachedFlowInfo)
}
fCache[matchString] = flow
fCache[matchString] = cachedFlowInfo
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
Expand All @@ -463,31 +469,30 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow
}

// deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey.
func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error {
return c.deleteFlowsWithMultipleKeys(cache, []string{flowCacheKey})
func (c *client) deleteFlows(cache *flowCategoryCache, flowMessageCacheKey string) error {
return c.deleteFlowsWithMultipleKeys(cache, []string{flowMessageCacheKey})
}

// deleteFlowsWithMultipleKeys uninstalls the flows with different flowCache keys and remove them from the cache on success.
// deleteFlowsWithMultipleKeys uninstalls the flows with different flowMessageCache keys and remove them from the cache on success.
// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle.
func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error {
// allFlows keeps the flows we will delete via a bundle.
var allFlows []binding.Flow
var allFlows []*openflow15.FlowMod
for _, key := range keys {
flows, ok := cache.Load(key)
// If a flow cache entry of the key does not exist, skip it.
if !ok {
klog.V(2).InfoS("Cached flow with provided key was not found", "key", key)
continue
}
for _, flow := range flows.(flowCache) {
for _, flow := range flows.(flowMessageCache) {
allFlows = append(allFlows, flow)
}
}
if len(allFlows) == 0 {
return nil
}
err := c.ofEntryOperations.DeleteAll(allFlows)
if err != nil {
if err := c.ofEntryOperations.DeleteAll(allFlows); err != nil {
return err
}
// Delete the keys and corresponding flows from the flow cache.
Expand All @@ -498,16 +503,7 @@ func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []st
}

func (c *client) deleteAllFlows(cache *flowCategoryCache) error {
var delAllFlows []binding.Flow
cache.Range(func(key, value any) bool {
fCache := value.(flowCache)
delFlows := make([]binding.Flow, 0, len(fCache))
for _, flow := range fCache {
delFlows = append(delFlows, flow)
}
delAllFlows = append(delAllFlows, delFlows...)
return true
})
delAllFlows := getCachedFlowMessages(cache)
if delAllFlows != nil {
if err := c.ofEntryOperations.DeleteAll(delAllFlows); err != nil {
return err
Expand Down Expand Up @@ -651,7 +647,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
if !ok {
return nil
}
fCache := fCacheI.(flowCache)
fCache := fCacheI.(flowMessageCache)
flowKeys := make([]string, 0, len(fCache))

// ReplayFlows() could change Flow internal state. Although its current
Expand All @@ -660,7 +656,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, flow.MatchString())
flowKeys = append(flowKeys, getFlowKey(flow))
}
return flowKeys
}
Expand Down Expand Up @@ -1539,3 +1535,18 @@ func (c *client) UninstallMulticlusterPodFlows(podIP string) error {
}
return c.deleteFlows(c.featureMulticluster.cachedPodFlows, podIP)
}

func GetFlowModMessages(flows []binding.Flow, op binding.OFOperation) []*openflow15.FlowMod {
messages := make([]*openflow15.FlowMod, 0, len(flows))
for i := range flows {
bundleMessages, _ := flows[i].GetBundleMessages(op)
msg := bundleMessages[0].GetMessage().(*openflow15.FlowMod)
messages = append(messages, msg)
}
return messages
}

func getFlowModMessage(flow binding.Flow, entryOper binding.OFOperation) *openflow15.FlowMod {
messages := GetFlowModMessages([]binding.Flow{flow}, entryOper)
return messages[0]
}
Loading

0 comments on commit 85a6be6

Please sign in to comment.