Skip to content

Commit

Permalink
Some prerequisite changes for pkg/agent/openflow unit tests (antrea-i…
Browse files Browse the repository at this point in the history
…o#4395)

- Add `ModifyOFEntries` to interface `OFEntryOperations`.
- To mock installing/modifying/deleting a group to OVS in unit tests, install/modify/delete
  a group by methods of `ModifyOFEntries`.
- Add method `SetOFSwitch` of `OFBridge` to set a mock OVS bridge in unit tests.
- Capitalize method `initialize` of `OFBridge` to initialize tables in unit tests.

Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl authored Dec 9, 2022
1 parent d32f6ca commit bc55bde
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 41 deletions.
39 changes: 27 additions & 12 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,11 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := group.Add(); err != nil {
if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil {
return fmt.Errorf("error when installing Service Endpoints Group %d: %w", groupID, err)
}
} else {
if err := group.Modify(); err != nil {
if err := c.ofEntryOperations.ModifyOFEntries([]binding.OFEntry{group}); err != nil {
return fmt.Errorf("error when modifying Service Endpoints Group %d: %w", groupID, err)
}
}
Expand All @@ -637,10 +637,13 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if err := c.bridge.DeleteGroup(groupID); err != nil {
return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err)
gCache, ok := c.featureService.groupCache.Load(groupID)
if ok {
if err := c.ofEntryOperations.DeleteOFEntries([]binding.OFEntry{gCache.(binding.Group)}); err != nil {
return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err)
}
c.featureService.groupCache.Delete(groupID)
}
c.featureService.groupCache.Delete(groupID)
return nil
}

Expand Down Expand Up @@ -1309,24 +1312,36 @@ func (c *client) SendIGMPRemoteReportPacketOut(
func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
table := MulticastOutputTable
nextTable := MulticastOutputTable.GetID()
if c.enableAntreaPolicy {
table = MulticastIngressRuleTable
nextTable = MulticastIngressRuleTable.GetID()
}

if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers, remoteNodeReceivers); err != nil {
return err
group := c.featureMulticast.multicastReceiversGroup(groupID, nextTable, localReceivers, remoteNodeReceivers)
_, installed := c.featureMulticast.groupCache.Load(groupID)
if !installed {
if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil {
return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err)
}
} else {
if err := c.ofEntryOperations.ModifyOFEntries([]binding.OFEntry{group}); err != nil {
return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err)
}
}
c.featureMulticast.groupCache.Store(groupID, group)
return nil
}

func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if err := c.bridge.DeleteGroup(groupID); err != nil {
return fmt.Errorf("error when deleting Multicast receiver Group %d: %w", groupID, err)
gCache, ok := c.featureMulticast.groupCache.Load(groupID)
if ok {
if err := c.ofEntryOperations.DeleteOFEntries([]binding.OFEntry{gCache.(binding.Group)}); err != nil {
return fmt.Errorf("error when deleting Multicast receiver Group %d: %w", groupID, err)
}
c.featureMulticast.groupCache.Delete(groupID)
}
c.featureMulticast.groupCache.Delete(groupID)
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,12 +640,12 @@ func TestServiceGroupInstallAndUninstall(t *testing.T) {
mockBucketBuilder.EXPECT().ResubmitToTable(EndpointDNATTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints))
}
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(endpoints))
mockGroup.EXPECT().Add().Return(nil)
m.EXPECT().AddOFEntries(gomock.Any())
m.EXPECT().DeleteOFEntries(gomock.Any()).Return(tc.deleteGroupError)
err := client.InstallServiceGroup(tc.groupID, tc.sessionAffinity, endpoints)
require.NoError(t, err)
_, exists := client.featureService.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError)
err = client.UninstallServiceGroup(tc.groupID)
_, exists = client.featureService.groupCache.Load(tc.groupID)
if tc.deleteGroupError == nil {
Expand Down Expand Up @@ -699,12 +699,12 @@ func TestMulticastGroupInstallAndUninstall(t *testing.T) {
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastOutputTable.GetID()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().SetTunnelDst(gomock.Any()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(localReceivers) + len(remoteIPs))
mockGroup.EXPECT().Add().Return(nil)
m.EXPECT().AddOFEntries(gomock.Any())
m.EXPECT().DeleteOFEntries(gomock.Any()).Return(tc.deleteGroupError)
err := client.InstallMulticastGroup(tc.groupID, localReceivers, remoteIPs)
require.NoError(t, err)
_, exists := client.featureMulticast.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError)
err = client.UninstallMulticastGroup(tc.groupID)
if tc.deleteGroupError == nil {
assert.NoError(t, err)
Expand Down
17 changes: 2 additions & 15 deletions pkg/agent/openflow/multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package openflow

import (
"fmt"
"net"
"sync"

Expand Down Expand Up @@ -93,7 +92,7 @@ func (f *featureMulticast) replayFlows() []binding.Flow {
return getCachedFlows(f.cachedFlows)
}

func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) error {
func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) binding.Group {
group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets()
for i := range ports {
group = group.Bucket().
Expand All @@ -110,19 +109,7 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType,
ResubmitToTable(MulticastOutputTable.GetID()).
Done()
}

_, installed := f.groupCache.Load(groupID)
if !installed {
if err := group.Add(); err != nil {
return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err)
}
} else {
if err := group.Modify(); err != nil {
return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err)
}
}
f.groupCache.Store(groupID, group)
return nil
return group
}

func (f *featureMulticast) multicastOutputFlows() []binding.Flow {
Expand Down
5 changes: 5 additions & 0 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ type OFEntryOperations interface {
BundleOps(adds []binding.Flow, mods []binding.Flow, dels []binding.Flow) error
DeleteAll(flows []binding.Flow) error
AddOFEntries(ofEntries []binding.OFEntry) error
ModifyOFEntries(ofEntries []binding.OFEntry) error
DeleteOFEntries(ofEntries []binding.OFEntry) error
}

Expand Down Expand Up @@ -543,6 +544,10 @@ func (c *client) AddOFEntries(ofEntries []binding.OFEntry) error {
return c.changeOFEntries(ofEntries, add)
}

func (c *client) ModifyOFEntries(ofEntries []binding.OFEntry) error {
return c.changeOFEntries(ofEntries, mod)
}

func (c *client) DeleteOFEntries(ofEntries []binding.OFEntry) error {
return c.changeOFEntries(ofEntries, del)
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 11 additions & 10 deletions pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,11 @@ func (b *OFBridge) PacketRcvd(sw *ofctrl.OFSwitch, packet *ofctrl.PacketIn) {
// SwitchConnected is a callback when the remote OFSwitch is connected.
func (b *OFBridge) SwitchConnected(sw *ofctrl.OFSwitch) {
klog.Infof("OFSwitch is connected: %v", sw.DPID())
func() {
b.ofSwitchMutex.Lock()
defer b.ofSwitchMutex.Unlock()
b.ofSwitch = sw
}()
b.SetOFSwitch(sw)
b.ofSwitch.EnableMonitor()
// initialize tables.
b.initialize()
b.Initialize()
b.queryTableFeatures()
go func() {
// b.connected is nil if it is an automatic reconnection but not triggered by OFSwitch.Connect.
if b.connected != nil {
Expand All @@ -341,6 +338,12 @@ func (b *OFBridge) SwitchConnected(sw *ofctrl.OFSwitch) {
}()
}

func (b *OFBridge) SetOFSwitch(sw *ofctrl.OFSwitch) {
b.ofSwitchMutex.Lock()
defer b.ofSwitchMutex.Unlock()
b.ofSwitch = sw
}

// MultipartReply is a callback when multipartReply message is received on ofctrl.OFSwitch is connected.
// Client uses this method to handle the reply message if it has customized MultipartRequest message.
func (b *OFBridge) MultipartReply(sw *ofctrl.OFSwitch, rep *openflow15.MultipartReply) {
Expand All @@ -353,8 +356,8 @@ func (b *OFBridge) SwitchDisconnected(sw *ofctrl.OFSwitch) {
klog.Infof("OFSwitch is disconnected: %v", sw.DPID())
}

// initialize creates ofctrl.Table for each table in the tableCache.
func (b *OFBridge) initialize() {
// Initialize creates ofctrl.Table for each table in the tableCache.
func (b *OFBridge) Initialize() {
b.Lock()
defer b.Unlock()

Expand All @@ -372,8 +375,6 @@ func (b *OFBridge) initialize() {
table.ResetStatus()
}

b.queryTableFeatures()

metrics.OVSTotalFlowCount.Set(0)
}

Expand Down

0 comments on commit bc55bde

Please sign in to comment.