Skip to content

Commit

Permalink
Bugfix: Remove multicast group from cache when group is unintalled
Browse files Browse the repository at this point in the history
An issue exists in the code when calling UninstallGroups in multicast
feature, that the group entry in the cache is not removed. This is
because the existing code only remove group in featureService cache.

To resolve the issue, rename the existing API UninstallGroup as
UninstallServiceGroup, and add a new API UninstallMulticastGroup to
delete multicast related groups. The new API also removes the group
entry in the cache.

Signed-off-by: wenyingd <[email protected]>
  • Loading branch information
wenyingd committed Aug 31, 2022
1 parent 3714619 commit b04253d
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 32 deletions.
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function generate_mocks {
"pkg/controller/querier ControllerQuerier testing"
"pkg/flowaggregator/exporter Interface testing"
"pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder testing"
"pkg/ovs/ovsconfig OVSBridgeClient testing"
"pkg/ovs/ovsctl OVSCtlClient testing"
"pkg/querier AgentNetworkPolicyInfoQuerier,AgentMulticastInfoQuerier testing"
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (c *Controller) syncGroup(groupKey string) error {
return err
}
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil {
if err := c.ofClient.UninstallMulticastGroup(status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey)
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ func TestCheckLastMember(t *testing.T) {
if ev != nil {
status.group = ev.group
if ev.eType == groupLeave {
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(status.group)
}
} else {
status.group = mgroup
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(status.group)
}
_ = mctrl.groupCache.Add(status)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestClearStaleGroups(t *testing.T) {
fakePort++
}
}
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastFlows(gomock.Any()).Times(len(staleGroups))
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups))
mctrl.clearStaleGroups()
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestEncapLocalReportAndNotifyRemote(t *testing.T) {
}
}
if tc.groupChanged {
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group)
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any())
Expand Down
21 changes: 17 additions & 4 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ type Client interface {
// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
// UninstallGroup removes the group and its buckets that are
// installed by InstallServiceGroup or InstallMulticastGroup.
UninstallGroup(groupID binding.GroupIDType) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error

// InstallEndpointFlows installs flows for accessing Endpoints.
// If an Endpoint is on the current Node, then flows for hairpin and endpoint
Expand Down Expand Up @@ -314,6 +314,9 @@ type Client interface {
UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error

InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error
// UninstallMulticastGroup removes the group and its buckets that are
// installed by InstallMulticastGroup.
UninstallMulticastGroup(groupID binding.GroupIDType) error

// SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port.
SendIGMPRemoteReportPacketOut(
Expand Down Expand Up @@ -627,7 +630,7 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
return nil
}

func (c *client) UninstallGroup(groupID binding.GroupIDType) error {
func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
Expand Down Expand Up @@ -1322,6 +1325,16 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive
return nil
}

func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
return fmt.Errorf("group %d delete failed", groupID)
}
c.featureMulticast.groupCache.Delete(groupID)
return nil
}

// InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular
// Node and a local Gateway.
func (c *client) InstallMulticlusterNodeFlows(clusterID string,
Expand Down
92 changes: 92 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
ovsoftest "antrea.io/antrea/pkg/ovs/openflow/testing"
"antrea.io/antrea/pkg/ovs/ovsconfig"
utilip "antrea.io/antrea/pkg/util/ip"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const bridgeName = "dummy-br"
Expand Down Expand Up @@ -583,3 +584,94 @@ func TestMulticlusterFlowsInstallation(t *testing.T) {
_, ok = client.featureMulticluster.cachedFlows.Load(cacheKey)
require.False(t, ok)
}

func TestServiceGroupInstallAndUnInstall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFBridge := ovsoftest.NewMockBridge(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, true, false, false, false, false)
client := ofClient.(*client)
client.bridge = mockOFBridge
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
client.nodeConfig = nodeConfig
client.networkConfig = networkConfig
client.serviceConfig = serviceConfig
client.ipProtocols = []binding.Protocol{binding.ProtocolIP}
client.generatePipelines()
endpoints := []k8sproxy.Endpoint{
&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("192.168.1.2", "8081"),
IsLocal: true,
},
&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("10.20.1.11", "8081"),
IsLocal: false,
},
}
groupID1 := binding.GroupIDType(1)
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
mockOFBridge.EXPECT().CreateGroup(groupID1).Return(mockGroup).Times(1)
mockGroup.EXPECT().ResetBuckets().Return(mockGroup).Times(1)
mockGroup.EXPECT().Bucket().Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().Weight(gomock.Any()).Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().LoadToRegField(gomock.Any(), gomock.Any()).Return(mockBucketBuilder).AnyTimes()
mockBucketBuilder.EXPECT().ResubmitToTable(EndpointDNATTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(endpoints))
mockGroup.EXPECT().Add().Return(nil)
err := client.InstallServiceGroup(groupID1, false, endpoints)
require.NoError(t, err)
_, exists := client.featureService.groupCache.Load(groupID1)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(groupID1).Return(true)
err = client.UninstallServiceGroup(groupID1)
require.NoError(t, err)
_, exists = client.featureService.groupCache.Load(groupID1)
assert.False(t, exists)
}

func TestMulticastGroupInstallAndUnInstall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFBridge := ovsoftest.NewMockBridge(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, true, false, false)
client := ofClient.(*client)
client.bridge = mockOFBridge
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
client.nodeConfig = nodeConfig
client.networkConfig = networkConfig
client.serviceConfig = serviceConfig
client.ipProtocols = []binding.Protocol{binding.ProtocolIP}
client.generatePipelines()
localReceivers := []uint32{101, 102}
remoteIPs := []net.IP{
net.ParseIP("1.1.1.2"),
net.ParseIP("1.1.1.3"),
net.ParseIP("1.1.1.4"),
}
groupID1 := binding.GroupIDType(1)
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
mockOFBridge.EXPECT().CreateGroupTypeAll(groupID1).Return(mockGroup).Times(1)
mockGroup.EXPECT().ResetBuckets().Return(mockGroup).Times(1)
mockGroup.EXPECT().Bucket().Return(mockBucketBuilder).Times(len(localReceivers) + len(remoteIPs))
mockBucketBuilder.EXPECT().LoadToRegField(gomock.Any(), gomock.Any()).Return(mockBucketBuilder).AnyTimes()
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastIngressRuleTable.GetID()).Return(mockBucketBuilder).Times(len(localReceivers))
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastOutputTable.GetID()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().SetTunnelDst(gomock.Any()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(localReceivers) + len(remoteIPs))
mockGroup.EXPECT().Add().Return(nil)
err := client.InstallMulticastGroup(groupID1, localReceivers, remoteIPs)
require.NoError(t, err)
_, exists := client.featureMulticast.groupCache.Load(groupID1)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(groupID1).Return(true)
err = client.UninstallMulticastGroup(groupID1)
require.NoError(t, err)
_, exists = client.featureMulticast.groupCache.Load(groupID1)
assert.False(t, exists)
}
38 changes: 26 additions & 12 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (p *proxier) removeStaleServices() {
// Remove Service group whose Endpoints are local.
if svcInfo.NodeLocalExternal() {
if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist {
if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
Expand All @@ -187,7 +187,7 @@ func (p *proxier) removeStaleServices() {
}
// Remove Service group which has all Endpoints.
if groupID, exist := p.groupCounter.Get(svcPortName, false); exist {
if err := p.ofClient.UninstallGroup(groupID); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName)
continue
}
Expand Down Expand Up @@ -563,7 +563,7 @@ func (p *proxier) installServices() {
continue
}
if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist {
if err := p.ofClient.UninstallGroup(groupID); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName)
continue
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeLoadBalancer).Times(1)
}
groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)
mockOFClient.EXPECT().UninstallGroup(groupID).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
if proxyLoadBalancerIPs {
Expand Down Expand Up @@ -466,7 +466,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1)

groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)
mockOFClient.EXPECT().UninstallGroup(groupID).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockRouteClient.EXPECT().AddNodePort(gomock.Any(), uint16(svcNodePort), bindingProtocol).Times(1)
Expand Down Expand Up @@ -748,7 +748,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
fp.syncProxyRules()

fp.serviceChanges.OnServiceUpdate(service, nil)
Expand Down Expand Up @@ -1231,8 +1231,8 @@ func TestServicesWithSameEndpoints(t *testing.T) {
mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallGroup(groupID1).Times(1)
mockOFClient.EXPECT().UninstallGroup(groupID2).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID2).Times(1)
// Since these two Services reference to the same Endpoint, there should only be one operation.
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)

Expand Down
Loading

0 comments on commit b04253d

Please sign in to comment.