Skip to content

Commit

Permalink
Merge d216562 into 53bb0c3
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyingd authored Sep 5, 2022
2 parents 53bb0c3 + d216562 commit 396ab64
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 32 deletions.
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function generate_mocks {
"pkg/controller/querier ControllerQuerier testing"
"pkg/flowaggregator/exporter Interface testing"
"pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder testing"
"pkg/ovs/ovsconfig OVSBridgeClient testing"
"pkg/ovs/ovsctl OVSCtlClient testing"
"pkg/querier AgentNetworkPolicyInfoQuerier,AgentMulticastInfoQuerier testing"
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (c *Controller) syncGroup(groupKey string) error {
return err
}
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil {
if err := c.ofClient.UninstallMulticastGroup(status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey)
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ func TestCheckLastMember(t *testing.T) {
if ev != nil {
status.group = ev.group
if ev.eType == groupLeave {
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(status.group)
}
} else {
status.group = mgroup
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(status.group)
}
_ = mctrl.groupCache.Add(status)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestClearStaleGroups(t *testing.T) {
fakePort++
}
}
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastFlows(gomock.Any()).Times(len(staleGroups))
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups))
mctrl.clearStaleGroups()
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestEncapLocalReportAndNotifyRemote(t *testing.T) {
}
}
if tc.groupChanged {
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group)
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any())
Expand Down
21 changes: 17 additions & 4 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ type Client interface {
// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
// UninstallGroup removes the group and its buckets that are
// installed by InstallServiceGroup or InstallMulticastGroup.
UninstallGroup(groupID binding.GroupIDType) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error

// InstallEndpointFlows installs flows for accessing Endpoints.
// If an Endpoint is on the current Node, then flows for hairpin and endpoint
Expand Down Expand Up @@ -314,6 +314,9 @@ type Client interface {
UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error

InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error
// UninstallMulticastGroup removes the group and its buckets that are
// installed by InstallMulticastGroup.
UninstallMulticastGroup(groupID binding.GroupIDType) error

// SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port.
SendIGMPRemoteReportPacketOut(
Expand Down Expand Up @@ -627,7 +630,7 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
return nil
}

func (c *client) UninstallGroup(groupID binding.GroupIDType) error {
func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
Expand Down Expand Up @@ -1322,6 +1325,16 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive
return nil
}

func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
return fmt.Errorf("group %d delete failed", groupID)
}
c.featureMulticast.groupCache.Delete(groupID)
return nil
}

// InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular
// Node and a local Gateway.
func (c *client) InstallMulticlusterNodeFlows(clusterID string,
Expand Down
126 changes: 126 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
ovsoftest "antrea.io/antrea/pkg/ovs/openflow/testing"
"antrea.io/antrea/pkg/ovs/ovsconfig"
utilip "antrea.io/antrea/pkg/util/ip"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const bridgeName = "dummy-br"
Expand Down Expand Up @@ -583,3 +584,128 @@ func TestMulticlusterFlowsInstallation(t *testing.T) {
_, ok = client.featureMulticluster.cachedFlows.Load(cacheKey)
require.False(t, ok)
}

func TestServiceGroupInstallAndUninstall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFBridge := ovsoftest.NewMockBridge(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, true, false, false, false, false)
client := ofClient.(*client)
client.bridge = mockOFBridge
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
client.nodeConfig = nodeConfig
client.networkConfig = networkConfig
client.serviceConfig = serviceConfig
client.ipProtocols = []binding.Protocol{binding.ProtocolIP}
client.generatePipelines()
endpoints := []k8sproxy.Endpoint{
&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("192.168.1.2", "8081"),
IsLocal: true,
},
&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("10.20.1.11", "8081"),
IsLocal: false,
},
}
groupID1 := binding.GroupIDType(1)
groupID2 := binding.GroupIDType(2)
for _, tc := range []struct {
groupID binding.GroupIDType
sessionAffinity bool
deleteSucceeded bool
}{
{groupID: groupID1, deleteSucceeded: false, sessionAffinity: true},
{groupID: groupID2, deleteSucceeded: true, sessionAffinity: false},
} {
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
mockOFBridge.EXPECT().CreateGroup(tc.groupID).Return(mockGroup).Times(1)
mockGroup.EXPECT().ResetBuckets().Return(mockGroup).Times(1)
mockGroup.EXPECT().Bucket().Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().Weight(gomock.Any()).Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().LoadToRegField(gomock.Any(), gomock.Any()).Return(mockBucketBuilder).AnyTimes()
if tc.sessionAffinity {
mockBucketBuilder.EXPECT().ResubmitToTable(ServiceLBTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints))
} else {
mockBucketBuilder.EXPECT().ResubmitToTable(EndpointDNATTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints))
}
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(endpoints))
mockGroup.EXPECT().Add().Return(nil)
err := client.InstallServiceGroup(tc.groupID, tc.sessionAffinity, endpoints)
require.NoError(t, err)
_, exists := client.featureService.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded)
err = client.UninstallServiceGroup(tc.groupID)
_, exists = client.featureService.groupCache.Load(tc.groupID)
if tc.deleteSucceeded {
assert.NoError(t, err)
assert.False(t, exists)
} else {
assert.Error(t, err)
assert.True(t, exists)
}
}
}

func TestMulticastGroupInstallAndUninstall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFBridge := ovsoftest.NewMockBridge(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, true, false, false)
client := ofClient.(*client)
client.bridge = mockOFBridge
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
client.nodeConfig = nodeConfig
client.networkConfig = networkConfig
client.serviceConfig = serviceConfig
client.ipProtocols = []binding.Protocol{binding.ProtocolIP}
client.generatePipelines()
localReceivers := []uint32{101, 102}
remoteIPs := []net.IP{
net.ParseIP("1.1.1.2"),
net.ParseIP("1.1.1.3"),
net.ParseIP("1.1.1.4"),
}
groupID1 := binding.GroupIDType(1)
groupID2 := binding.GroupIDType(2)
for _, tc := range []struct {
groupID binding.GroupIDType
deleteSucceeded bool
}{
{groupID: groupID1, deleteSucceeded: false},
{groupID: groupID2, deleteSucceeded: true},
} {
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
mockOFBridge.EXPECT().CreateGroupTypeAll(tc.groupID).Return(mockGroup).Times(1)
mockGroup.EXPECT().ResetBuckets().Return(mockGroup).Times(1)
mockGroup.EXPECT().Bucket().Return(mockBucketBuilder).Times(len(localReceivers) + len(remoteIPs))
mockBucketBuilder.EXPECT().LoadToRegField(gomock.Any(), gomock.Any()).Return(mockBucketBuilder).AnyTimes()
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastIngressRuleTable.GetID()).Return(mockBucketBuilder).Times(len(localReceivers))
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastOutputTable.GetID()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().SetTunnelDst(gomock.Any()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(localReceivers) + len(remoteIPs))
mockGroup.EXPECT().Add().Return(nil)
err := client.InstallMulticastGroup(tc.groupID, localReceivers, remoteIPs)
require.NoError(t, err)
_, exists := client.featureMulticast.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded)
err = client.UninstallMulticastGroup(tc.groupID)
if tc.deleteSucceeded {
assert.NoError(t, err)
_, exists = client.featureMulticast.groupCache.Load(tc.groupID)
assert.False(t, exists)
} else {
assert.Error(t, err)
_, exists = client.featureMulticast.groupCache.Load(tc.groupID)
assert.True(t, exists)
}
}
}
38 changes: 26 additions & 12 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p *proxier) removeStaleServices() {
// Remove Service group whose Endpoints are local.
if svcInfo.NodeLocalExternal() {
if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist {
if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
Expand All @@ -192,7 +192,7 @@ func (p *proxier) removeStaleServices() {
}
// Remove Service group which has all Endpoints.
if groupID, exist := p.groupCounter.Get(svcPortName, false); exist {
if err := p.ofClient.UninstallGroup(groupID); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName)
continue
}
Expand Down Expand Up @@ -568,7 +568,7 @@ func (p *proxier) installServices() {
continue
}
if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist {
if err := p.ofClient.UninstallGroup(groupID); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName)
continue
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeLoadBalancer).Times(1)
}
groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)
mockOFClient.EXPECT().UninstallGroup(groupID).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
if proxyLoadBalancerIPs {
Expand Down Expand Up @@ -467,7 +467,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1)

groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)
mockOFClient.EXPECT().UninstallGroup(groupID).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockRouteClient.EXPECT().AddNodePort(gomock.Any(), uint16(svcNodePort), bindingProtocol).Times(1)
Expand Down Expand Up @@ -749,7 +749,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
fp.syncProxyRules()

fp.serviceChanges.OnServiceUpdate(service, nil)
Expand Down Expand Up @@ -1232,8 +1232,8 @@ func TestServicesWithSameEndpoints(t *testing.T) {
mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallGroup(groupID1).Times(1)
mockOFClient.EXPECT().UninstallGroup(groupID2).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID2).Times(1)
// Since these two Services reference to the same Endpoint, there should only be one operation.
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)

Expand Down
Loading

0 comments on commit 396ab64

Please sign in to comment.