Skip to content

Commit

Permalink
Cache OpenFlow modification message in Antrea Agent (#4495)
Browse files Browse the repository at this point in the history
The motivation for this patch is to reduce the memory used for storing
the installed OpenFlow flow entries to replay them when OVS is re-connected.
In the existing code, struct ofctrl.Flow is stored in the memory, which
has taken a lot of memory because all kinds of the candidate Match fields
are allocated in one such object although they may not use in the final Flow.

This patch is to store the FlowModification message as a substituation of the
ofctrl.Flow objects. As the FlowModification message is the obejct which
was sent to OVS via the OF connections in final, it contains enough information
to replay the OpenFlow flow entry. Besides, a FlowModification only contains
the necessary fields which are set with values.

A test with this patch may reduce about 140M memory on a setup with about
130K OpenFlow entries in antrea-agent (from 380M to 240M)

Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd authored May 19, 2023
1 parent 46d2a3c commit 11026df
Show file tree
Hide file tree
Showing 32 changed files with 1,685 additions and 1,336 deletions.
87 changes: 49 additions & 38 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,31 +393,32 @@ func (c *client) addFlows(cache *flowCategoryCache, flowCacheKey string, flows [
return c.addFlowsWithMultipleKeys(cache, map[string][]binding.Flow{flowCacheKey: flows})
}

// addFlowsWithMultipleKeys installs the flows with different flowCache keys and adds them into the cache on success.
// addFlowsWithMultipleKeys installs the flows with different flowMessageCache keys and adds them into the cache on success.
// It will skip flows whose cache already exists. All flows will be installed via a bundle.
func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows map[string][]binding.Flow) error {
// allFlows keeps the flows we will install via a bundle.
var allFlows []binding.Flow
// flowCacheMap keeps the flowCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowCache{}
// allMessages keeps the OpenFlow modification messages we will install via a bundle.
var allMessages []*openflow15.FlowMod
// flowCacheMap keeps the flowMessageCache items we will add to the cache on bundle success.
flowCacheMap := map[string]flowMessageCache{}
for flowCacheKey, flows := range keyToFlows {
_, ok := cache.Load(flowCacheKey)
// If a flow cache entry already exists for the key, skip it.
if ok {
klog.V(2).InfoS("Flows with this cache key are already installed", "key", flowCacheKey)
continue
}
fCache := flowCache{}
fCache := flowMessageCache{}
for _, flow := range flows {
allFlows = append(allFlows, flow)
fCache[flow.MatchString()] = flow
msg := getFlowModMessage(flow, binding.AddMessage)
allMessages = append(allMessages, msg)
fCache[getFlowKey(msg)] = msg
}
flowCacheMap[flowCacheKey] = fCache
}
if len(allFlows) == 0 {
if len(allMessages) == 0 {
return nil
}
err := c.ofEntryOperations.AddAll(allFlows)
err := c.ofEntryOperations.AddAll(allMessages)
if err != nil {
return err
}
Expand All @@ -431,25 +432,30 @@ func (c *client) addFlowsWithMultipleKeys(cache *flowCategoryCache, keyToFlows m
// modifyFlows sets the flows of flowCategoryCache be exactly same as the provided slice for the given flowCacheKey.
func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flows []binding.Flow) error {
oldFlowCacheI, ok := cache.Load(flowCacheKey)
fCache := flowCache{}
fCache := flowMessageCache{}
var err error
if !ok {
messages := make([]*openflow15.FlowMod, 0, len(flows))
for _, flow := range flows {
fCache[flow.MatchString()] = flow
msg := getFlowModMessage(flow, binding.AddMessage)
messages = append(messages, msg)
fCache[getFlowKey(msg)] = msg
}

err = c.ofEntryOperations.AddAll(flows)
err = c.ofEntryOperations.AddAll(messages)
} else {
var adds, mods, dels []binding.Flow
oldFlowCache := oldFlowCacheI.(flowCache)
var adds, mods, dels []*openflow15.FlowMod
oldFlowCache := oldFlowCacheI.(flowMessageCache)
for _, flow := range flows {
matchString := flow.MatchString()
var msg *openflow15.FlowMod
if _, ok := oldFlowCache[matchString]; ok {
mods = append(mods, flow)
msg = getFlowModMessage(flow, binding.ModifyMessage)
adds = append(adds, msg)
} else {
adds = append(adds, flow)
msg = getFlowModMessage(flow, binding.AddMessage)
mods = append(mods, msg)
}
fCache[matchString] = flow
fCache[matchString] = msg
}
for k, v := range oldFlowCache {
if _, ok := fCache[k]; !ok {
Expand All @@ -468,31 +474,30 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow
}

// deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey.
func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error {
return c.deleteFlowsWithMultipleKeys(cache, []string{flowCacheKey})
func (c *client) deleteFlows(cache *flowCategoryCache, flowMessageCacheKey string) error {
return c.deleteFlowsWithMultipleKeys(cache, []string{flowMessageCacheKey})
}

// deleteFlowsWithMultipleKeys uninstalls the flows with different flowCache keys and remove them from the cache on success.
// deleteFlowsWithMultipleKeys uninstalls the flows with different flowMessageCache keys and remove them from the cache on success.
// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle.
func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error {
// allFlows keeps the flows we will delete via a bundle.
var allFlows []binding.Flow
var allFlows []*openflow15.FlowMod
for _, key := range keys {
flows, ok := cache.Load(key)
// If a flow cache entry of the key does not exist, skip it.
if !ok {
klog.V(2).InfoS("Cached flow with provided key was not found", "key", key)
continue
}
for _, flow := range flows.(flowCache) {
for _, flow := range flows.(flowMessageCache) {
allFlows = append(allFlows, flow)
}
}
if len(allFlows) == 0 {
return nil
}
err := c.ofEntryOperations.DeleteAll(allFlows)
if err != nil {
if err := c.ofEntryOperations.DeleteAll(allFlows); err != nil {
return err
}
// Delete the keys and corresponding flows from the flow cache.
Expand All @@ -503,16 +508,7 @@ func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []st
}

func (c *client) deleteAllFlows(cache *flowCategoryCache) error {
var delAllFlows []binding.Flow
cache.Range(func(key, value any) bool {
fCache := value.(flowCache)
delFlows := make([]binding.Flow, 0, len(fCache))
for _, flow := range fCache {
delFlows = append(delFlows, flow)
}
delAllFlows = append(delAllFlows, delFlows...)
return true
})
delAllFlows := getCachedFlowMessages(cache)
if delAllFlows != nil {
if err := c.ofEntryOperations.DeleteAll(delAllFlows); err != nil {
return err
Expand Down Expand Up @@ -656,7 +652,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
if !ok {
return nil
}
fCache := fCacheI.(flowCache)
fCache := fCacheI.(flowMessageCache)
flowKeys := make([]string, 0, len(fCache))

// ReplayFlows() could change Flow internal state. Although its current
Expand All @@ -665,7 +661,7 @@ func (c *client) getFlowKeysFromCache(cache *flowCategoryCache, cacheKey string)
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
for _, flow := range fCache {
flowKeys = append(flowKeys, flow.MatchString())
flowKeys = append(flowKeys, getFlowKey(flow))
}
return flowKeys
}
Expand Down Expand Up @@ -1551,3 +1547,18 @@ func (c *client) UninstallMulticlusterPodFlows(podIP string) error {
}
return c.deleteFlows(c.featureMulticluster.cachedPodFlows, podIP)
}

func GetFlowModMessages(flows []binding.Flow, op binding.OFOperation) []*openflow15.FlowMod {
messages := make([]*openflow15.FlowMod, 0, len(flows))
for i := range flows {
bundleMessages, _ := flows[i].GetBundleMessages(op)
msg := bundleMessages[0].GetMessage().(*openflow15.FlowMod)
messages = append(messages, msg)
}
return messages
}

func getFlowModMessage(flow binding.Flow, op binding.OFOperation) *openflow15.FlowMod {
messages := GetFlowModMessages([]binding.Flow{flow}, op)
return messages[0]
}
Loading

0 comments on commit 11026df

Please sign in to comment.