From bc55bde28f75e72c3c14be2bd00f0bfaffa070e1 Mon Sep 17 00:00:00 2001 From: Hongliang Liu <75655411+hongliangl@users.noreply.github.com> Date: Fri, 9 Dec 2022 21:03:03 +0800 Subject: [PATCH] Some prerequisite changes for pkg/agent/openflow unit tests (#4395) - Add `ModifyOFEntries` to interface `OFEntryOperations`. - To mock installing/modifying/deleting a group to OVS in unit tests, install/modify/delete a group by methods of `ModifyOFEntries`. - Add method `SetOFSwitch` of `OFBridge` to set a mock OVS bridge in unit tests. - Capitalize method `initialize` of `OFBridge` to initialize tables in unit tests. Signed-off-by: Hongliang Liu --- pkg/agent/openflow/client.go | 39 ++++++++++++++------- pkg/agent/openflow/client_test.go | 8 ++--- pkg/agent/openflow/multicast.go | 17 ++------- pkg/agent/openflow/pipeline.go | 5 +++ pkg/agent/openflow/testing/mock_openflow.go | 14 ++++++++ pkg/ovs/openflow/ofctrl_bridge.go | 21 +++++------ 6 files changed, 63 insertions(+), 41 deletions(-) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 84aa5081cc1..37b277ec3e1 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -622,11 +622,11 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...) _, installed := c.featureService.groupCache.Load(groupID) if !installed { - if err := group.Add(); err != nil { + if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil { return fmt.Errorf("error when installing Service Endpoints Group %d: %w", groupID, err) } } else { - if err := group.Modify(); err != nil { + if err := c.ofEntryOperations.ModifyOFEntries([]binding.OFEntry{group}); err != nil { return fmt.Errorf("error when modifying Service Endpoints Group %d: %w", groupID, err) } } @@ -637,10 +637,13 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - if err := c.bridge.DeleteGroup(groupID); err != nil { - return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err) + gCache, ok := c.featureService.groupCache.Load(groupID) + if ok { + if err := c.ofEntryOperations.DeleteOFEntries([]binding.OFEntry{gCache.(binding.Group)}); err != nil { + return fmt.Errorf("error when deleting Service Endpoints Group %d: %w", groupID, err) + } + c.featureService.groupCache.Delete(groupID) } - c.featureService.groupCache.Delete(groupID) return nil } @@ -1309,24 +1312,36 @@ func (c *client) SendIGMPRemoteReportPacketOut( func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - table := MulticastOutputTable + nextTable := MulticastOutputTable.GetID() if c.enableAntreaPolicy { - table = MulticastIngressRuleTable + nextTable = MulticastIngressRuleTable.GetID() } - if err := c.featureMulticast.multicastReceiversGroup(groupID, table.GetID(), localReceivers, remoteNodeReceivers); err != nil { - return err + group := c.featureMulticast.multicastReceiversGroup(groupID, nextTable, localReceivers, remoteNodeReceivers) + _, installed := c.featureMulticast.groupCache.Load(groupID) + if !installed { + if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil { + return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err) + } + } else { + if err := c.ofEntryOperations.ModifyOFEntries([]binding.OFEntry{group}); err != nil { + return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err) + } } + c.featureMulticast.groupCache.Store(groupID, group) return nil } func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - if err := c.bridge.DeleteGroup(groupID); err != nil { - return fmt.Errorf("error when deleting Multicast receiver Group %d: %w", groupID, err) + gCache, ok := c.featureMulticast.groupCache.Load(groupID) + if ok { + if err := c.ofEntryOperations.DeleteOFEntries([]binding.OFEntry{gCache.(binding.Group)}); err != nil { + return fmt.Errorf("error when deleting Multicast receiver Group %d: %w", groupID, err) + } + c.featureMulticast.groupCache.Delete(groupID) } - c.featureMulticast.groupCache.Delete(groupID) return nil } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index f55670ebebe..2d581ed5778 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -640,12 +640,12 @@ func TestServiceGroupInstallAndUninstall(t *testing.T) { mockBucketBuilder.EXPECT().ResubmitToTable(EndpointDNATTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints)) } mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(endpoints)) - mockGroup.EXPECT().Add().Return(nil) + m.EXPECT().AddOFEntries(gomock.Any()) + m.EXPECT().DeleteOFEntries(gomock.Any()).Return(tc.deleteGroupError) err := client.InstallServiceGroup(tc.groupID, tc.sessionAffinity, endpoints) require.NoError(t, err) _, exists := client.featureService.groupCache.Load(tc.groupID) assert.True(t, exists) - mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError) err = client.UninstallServiceGroup(tc.groupID) _, exists = client.featureService.groupCache.Load(tc.groupID) if tc.deleteGroupError == nil { @@ -699,12 +699,12 @@ func TestMulticastGroupInstallAndUninstall(t *testing.T) { mockBucketBuilder.EXPECT().ResubmitToTable(MulticastOutputTable.GetID()).Return(mockBucketBuilder).Times(len(remoteIPs)) mockBucketBuilder.EXPECT().SetTunnelDst(gomock.Any()).Return(mockBucketBuilder).Times(len(remoteIPs)) mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(localReceivers) + len(remoteIPs)) - mockGroup.EXPECT().Add().Return(nil) + m.EXPECT().AddOFEntries(gomock.Any()) + m.EXPECT().DeleteOFEntries(gomock.Any()).Return(tc.deleteGroupError) err := client.InstallMulticastGroup(tc.groupID, localReceivers, remoteIPs) require.NoError(t, err) _, exists := client.featureMulticast.groupCache.Load(tc.groupID) assert.True(t, exists) - mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteGroupError) err = client.UninstallMulticastGroup(tc.groupID) if tc.deleteGroupError == nil { assert.NoError(t, err) diff --git a/pkg/agent/openflow/multicast.go b/pkg/agent/openflow/multicast.go index 1d2bfed5abf..453291b1899 100644 --- a/pkg/agent/openflow/multicast.go +++ b/pkg/agent/openflow/multicast.go @@ -15,7 +15,6 @@ package openflow import ( - "fmt" "net" "sync" @@ -93,7 +92,7 @@ func (f *featureMulticast) replayFlows() []binding.Flow { return getCachedFlows(f.cachedFlows) } -func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) error { +func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, tableID uint8, ports []uint32, remoteIPs []net.IP) binding.Group { group := f.bridge.CreateGroupTypeAll(groupID).ResetBuckets() for i := range ports { group = group.Bucket(). @@ -110,19 +109,7 @@ func (f *featureMulticast) multicastReceiversGroup(groupID binding.GroupIDType, ResubmitToTable(MulticastOutputTable.GetID()). Done() } - - _, installed := f.groupCache.Load(groupID) - if !installed { - if err := group.Add(); err != nil { - return fmt.Errorf("error when installing Multicast receiver Group %d: %w", groupID, err) - } - } else { - if err := group.Modify(); err != nil { - return fmt.Errorf("error when modifying Multicast receiver Group %d: %w", groupID, err) - } - } - f.groupCache.Store(groupID, group) - return nil + return group } func (f *featureMulticast) multicastOutputFlows() []binding.Flow { diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 86195ba2c85..f5c338638dd 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -391,6 +391,7 @@ type OFEntryOperations interface { BundleOps(adds []binding.Flow, mods []binding.Flow, dels []binding.Flow) error DeleteAll(flows []binding.Flow) error AddOFEntries(ofEntries []binding.OFEntry) error + ModifyOFEntries(ofEntries []binding.OFEntry) error DeleteOFEntries(ofEntries []binding.OFEntry) error } @@ -543,6 +544,10 @@ func (c *client) AddOFEntries(ofEntries []binding.OFEntry) error { return c.changeOFEntries(ofEntries, add) } +func (c *client) ModifyOFEntries(ofEntries []binding.OFEntry) error { + return c.changeOFEntries(ofEntries, mod) +} + func (c *client) DeleteOFEntries(ofEntries []binding.OFEntry) error { return c.changeOFEntries(ofEntries, del) } diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 31a2293ef42..5ea6d08d8d7 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -1140,3 +1140,17 @@ func (mr *MockOFEntryOperationsMockRecorder) ModifyAll(arg0 interface{}) *gomock mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyAll", reflect.TypeOf((*MockOFEntryOperations)(nil).ModifyAll), arg0) } + +// ModifyOFEntries mocks base method +func (m *MockOFEntryOperations) ModifyOFEntries(arg0 []openflow.OFEntry) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModifyOFEntries", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// ModifyOFEntries indicates an expected call of ModifyOFEntries +func (mr *MockOFEntryOperationsMockRecorder) ModifyOFEntries(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModifyOFEntries", reflect.TypeOf((*MockOFEntryOperations)(nil).ModifyOFEntries), arg0) +} diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index d5466c349b8..a8af88edb89 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -324,14 +324,11 @@ func (b *OFBridge) PacketRcvd(sw *ofctrl.OFSwitch, packet *ofctrl.PacketIn) { // SwitchConnected is a callback when the remote OFSwitch is connected. func (b *OFBridge) SwitchConnected(sw *ofctrl.OFSwitch) { klog.Infof("OFSwitch is connected: %v", sw.DPID()) - func() { - b.ofSwitchMutex.Lock() - defer b.ofSwitchMutex.Unlock() - b.ofSwitch = sw - }() + b.SetOFSwitch(sw) b.ofSwitch.EnableMonitor() // initialize tables. - b.initialize() + b.Initialize() + b.queryTableFeatures() go func() { // b.connected is nil if it is an automatic reconnection but not triggered by OFSwitch.Connect. if b.connected != nil { @@ -341,6 +338,12 @@ func (b *OFBridge) SwitchConnected(sw *ofctrl.OFSwitch) { }() } +func (b *OFBridge) SetOFSwitch(sw *ofctrl.OFSwitch) { + b.ofSwitchMutex.Lock() + defer b.ofSwitchMutex.Unlock() + b.ofSwitch = sw +} + // MultipartReply is a callback when multipartReply message is received on ofctrl.OFSwitch is connected. // Client uses this method to handle the reply message if it has customized MultipartRequest message. func (b *OFBridge) MultipartReply(sw *ofctrl.OFSwitch, rep *openflow15.MultipartReply) { @@ -353,8 +356,8 @@ func (b *OFBridge) SwitchDisconnected(sw *ofctrl.OFSwitch) { klog.Infof("OFSwitch is disconnected: %v", sw.DPID()) } -// initialize creates ofctrl.Table for each table in the tableCache. -func (b *OFBridge) initialize() { +// Initialize creates ofctrl.Table for each table in the tableCache. +func (b *OFBridge) Initialize() { b.Lock() defer b.Unlock() @@ -372,8 +375,6 @@ func (b *OFBridge) initialize() { table.ResetStatus() } - b.queryTableFeatures() - metrics.OVSTotalFlowCount.Set(0) }