Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Remove multicast group from cache when group is uninstalled #4176

Merged
merged 1 commit into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ function generate_mocks {
"pkg/controller/querier ControllerQuerier testing"
"pkg/flowaggregator/exporter Interface testing"
"pkg/ipfix IPFIXExportingProcess,IPFIXRegistry,IPFIXCollectingProcess,IPFIXAggregationProcess testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder testing"
"pkg/ovs/openflow Bridge,Table,Flow,Action,CTAction,FlowBuilder,Group,BucketBuilder testing"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any new auto-generated code by this udpate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, with this change we could use gomock on Group and BucketBuilder in unit test.

"pkg/ovs/ovsconfig OVSBridgeClient testing"
"pkg/ovs/ovsctl OVSCtlClient testing"
"pkg/querier AgentNetworkPolicyInfoQuerier,AgentMulticastInfoQuerier testing"
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func (c *Controller) syncGroup(groupKey string) error {
return err
}
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil {
if err := c.ofClient.UninstallMulticastGroup(status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey)
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ func TestCheckLastMember(t *testing.T) {
if ev != nil {
status.group = ev.group
if ev.eType == groupLeave {
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(status.group)
}
} else {
status.group = mgroup
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(status.group)
}
_ = mctrl.groupCache.Add(status)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestClearStaleGroups(t *testing.T) {
fakePort++
}
}
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any()).Times(len(staleGroups))
mockOFClient.EXPECT().UninstallMulticastFlows(gomock.Any()).Times(len(staleGroups))
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(len(staleGroups))
mctrl.clearStaleGroups()
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestEncapLocalReportAndNotifyRemote(t *testing.T) {
}
}
if tc.groupChanged {
mockOFClient.EXPECT().UninstallGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastGroup(gomock.Any())
mockOFClient.EXPECT().UninstallMulticastFlows(tc.e.group)
mockMulticastSocket.EXPECT().MulticastInterfaceLeaveMgroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().SendIGMPRemoteReportPacketOut(igmpReportDstMac, types.IGMPv3Router, gomock.Any())
Expand Down
21 changes: 17 additions & 4 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ type Client interface {
// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
// UninstallGroup removes the group and its buckets that are
// installed by InstallServiceGroup or InstallMulticastGroup.
UninstallGroup(groupID binding.GroupIDType) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error

// InstallEndpointFlows installs flows for accessing Endpoints.
// If an Endpoint is on the current Node, then flows for hairpin and endpoint
Expand Down Expand Up @@ -314,6 +314,9 @@ type Client interface {
UninstallTrafficControlReturnPortFlow(returnOFPort uint32) error

InstallMulticastGroup(ofGroupID binding.GroupIDType, localReceivers []uint32, remoteNodeReceivers []net.IP) error
// UninstallMulticastGroup removes the group and its buckets that are
// installed by InstallMulticastGroup.
UninstallMulticastGroup(groupID binding.GroupIDType) error

// SendIGMPRemoteReportPacketOut sends the IGMP report packet as a packet-out to remote Nodes via the tunnel port.
SendIGMPRemoteReportPacketOut(
Expand Down Expand Up @@ -627,7 +630,7 @@ func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAff
return nil
}

func (c *client) UninstallGroup(groupID binding.GroupIDType) error {
func (c *client) UninstallServiceGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
Expand Down Expand Up @@ -1322,6 +1325,16 @@ func (c *client) InstallMulticastGroup(groupID binding.GroupIDType, localReceive
return nil
}

func (c *client) UninstallMulticastGroup(groupID binding.GroupIDType) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
if !c.bridge.DeleteGroup(groupID) {
return fmt.Errorf("group %d delete failed", groupID)
}
c.featureMulticast.groupCache.Delete(groupID)
return nil
}

// InstallMulticlusterNodeFlows installs flows to handle cross-cluster packets between a regular
// Node and a local Gateway.
func (c *client) InstallMulticlusterNodeFlows(clusterID string,
Expand Down
126 changes: 126 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
ovsoftest "antrea.io/antrea/pkg/ovs/openflow/testing"
"antrea.io/antrea/pkg/ovs/ovsconfig"
utilip "antrea.io/antrea/pkg/util/ip"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

const bridgeName = "dummy-br"
Expand Down Expand Up @@ -583,3 +584,128 @@ func TestMulticlusterFlowsInstallation(t *testing.T) {
_, ok = client.featureMulticluster.cachedFlows.Load(cacheKey)
require.False(t, ok)
}

func TestServiceGroupInstallAndUninstall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFBridge := ovsoftest.NewMockBridge(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, true, false, false, false, true, false, false, false, false)
client := ofClient.(*client)
client.bridge = mockOFBridge
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
client.nodeConfig = nodeConfig
client.networkConfig = networkConfig
client.serviceConfig = serviceConfig
client.ipProtocols = []binding.Protocol{binding.ProtocolIP}
client.generatePipelines()
endpoints := []k8sproxy.Endpoint{
&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("192.168.1.2", "8081"),
IsLocal: true,
},
&k8sproxy.BaseEndpointInfo{
Endpoint: net.JoinHostPort("10.20.1.11", "8081"),
IsLocal: false,
},
}
groupID1 := binding.GroupIDType(1)
groupID2 := binding.GroupIDType(2)
for _, tc := range []struct {
groupID binding.GroupIDType
sessionAffinity bool
deleteSucceeded bool
}{
{groupID: groupID1, deleteSucceeded: false, sessionAffinity: true},
{groupID: groupID2, deleteSucceeded: true, sessionAffinity: false},
} {
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
mockOFBridge.EXPECT().CreateGroup(tc.groupID).Return(mockGroup).Times(1)
mockGroup.EXPECT().ResetBuckets().Return(mockGroup).Times(1)
mockGroup.EXPECT().Bucket().Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().Weight(gomock.Any()).Return(mockBucketBuilder).Times(len(endpoints))
mockBucketBuilder.EXPECT().LoadToRegField(gomock.Any(), gomock.Any()).Return(mockBucketBuilder).AnyTimes()
if tc.sessionAffinity {
mockBucketBuilder.EXPECT().ResubmitToTable(ServiceLBTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints))
} else {
mockBucketBuilder.EXPECT().ResubmitToTable(EndpointDNATTable.GetID()).Return(mockBucketBuilder).Times(len(endpoints))
}
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(endpoints))
mockGroup.EXPECT().Add().Return(nil)
err := client.InstallServiceGroup(tc.groupID, tc.sessionAffinity, endpoints)
require.NoError(t, err)
_, exists := client.featureService.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded)
err = client.UninstallServiceGroup(tc.groupID)
_, exists = client.featureService.groupCache.Load(tc.groupID)
if tc.deleteSucceeded {
assert.NoError(t, err)
assert.False(t, exists)
} else {
assert.Error(t, err)
assert.True(t, exists)
}
}
}

func TestMulticastGroupInstallAndUninstall(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockOFBridge := ovsoftest.NewMockBridge(ctrl)
m := oftest.NewMockOFEntryOperations(ctrl)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, false, true, false, false, false, false, true, false, false)
client := ofClient.(*client)
client.bridge = mockOFBridge
client.cookieAllocator = cookie.NewAllocator(0)
client.ofEntryOperations = m
client.nodeConfig = nodeConfig
client.networkConfig = networkConfig
client.serviceConfig = serviceConfig
client.ipProtocols = []binding.Protocol{binding.ProtocolIP}
client.generatePipelines()
localReceivers := []uint32{101, 102}
remoteIPs := []net.IP{
net.ParseIP("1.1.1.2"),
net.ParseIP("1.1.1.3"),
net.ParseIP("1.1.1.4"),
}
groupID1 := binding.GroupIDType(1)
groupID2 := binding.GroupIDType(2)
for _, tc := range []struct {
groupID binding.GroupIDType
deleteSucceeded bool
}{
{groupID: groupID1, deleteSucceeded: false},
{groupID: groupID2, deleteSucceeded: true},
} {
mockGroup := ovsoftest.NewMockGroup(ctrl)
mockBucketBuilder := ovsoftest.NewMockBucketBuilder(ctrl)
mockOFBridge.EXPECT().CreateGroupTypeAll(tc.groupID).Return(mockGroup).Times(1)
mockGroup.EXPECT().ResetBuckets().Return(mockGroup).Times(1)
mockGroup.EXPECT().Bucket().Return(mockBucketBuilder).Times(len(localReceivers) + len(remoteIPs))
mockBucketBuilder.EXPECT().LoadToRegField(gomock.Any(), gomock.Any()).Return(mockBucketBuilder).AnyTimes()
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastIngressRuleTable.GetID()).Return(mockBucketBuilder).Times(len(localReceivers))
mockBucketBuilder.EXPECT().ResubmitToTable(MulticastOutputTable.GetID()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().SetTunnelDst(gomock.Any()).Return(mockBucketBuilder).Times(len(remoteIPs))
mockBucketBuilder.EXPECT().Done().Return(mockGroup).Times(len(localReceivers) + len(remoteIPs))
mockGroup.EXPECT().Add().Return(nil)
err := client.InstallMulticastGroup(tc.groupID, localReceivers, remoteIPs)
require.NoError(t, err)
_, exists := client.featureMulticast.groupCache.Load(tc.groupID)
assert.True(t, exists)
mockOFBridge.EXPECT().DeleteGroup(tc.groupID).Return(tc.deleteSucceeded)
err = client.UninstallMulticastGroup(tc.groupID)
if tc.deleteSucceeded {
assert.NoError(t, err)
_, exists = client.featureMulticast.groupCache.Load(tc.groupID)
assert.False(t, exists)
} else {
assert.Error(t, err)
_, exists = client.featureMulticast.groupCache.Load(tc.groupID)
assert.True(t, exists)
}
}
}
38 changes: 26 additions & 12 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p *proxier) removeStaleServices() {
// Remove Service group whose Endpoints are local.
if svcInfo.NodeLocalExternal() {
if groupIDLocal, exist := p.groupCounter.Get(svcPortName, true); exist {
if err := p.ofClient.UninstallGroup(groupIDLocal); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
Expand All @@ -192,7 +192,7 @@ func (p *proxier) removeStaleServices() {
}
// Remove Service group which has all Endpoints.
if groupID, exist := p.groupCounter.Get(svcPortName, false); exist {
if err := p.ofClient.UninstallGroup(groupID); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName)
continue
}
Expand Down Expand Up @@ -568,7 +568,7 @@ func (p *proxier) installServices() {
continue
}
if groupID, exist := p.groupCounter.Get(svcPortName, !nodeLocalVal); exist {
if err := p.ofClient.UninstallGroup(groupID); err != nil {
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to uninstall Group of all Endpoints for Service", "Service", svcPortName)
continue
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeLoadBalancer).Times(1)
}
groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)
mockOFClient.EXPECT().UninstallGroup(groupID).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
if proxyLoadBalancerIPs {
Expand Down Expand Up @@ -467,7 +467,7 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1)

groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)
mockOFClient.EXPECT().UninstallGroup(groupID).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)
}
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockRouteClient.EXPECT().AddNodePort(gomock.Any(), uint16(svcNodePort), bindingProtocol).Times(1)
Expand Down Expand Up @@ -749,7 +749,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallGroup(gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
fp.syncProxyRules()

fp.serviceChanges.OnServiceUpdate(service, nil)
Expand Down Expand Up @@ -1232,8 +1232,8 @@ func TestServicesWithSameEndpoints(t *testing.T) {
mockOFClient.EXPECT().InstallServiceFlows(groupID2, svcIP2, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP1, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP2, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallGroup(groupID1).Times(1)
mockOFClient.EXPECT().UninstallGroup(groupID2).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID2).Times(1)
// Since these two Services reference to the same Endpoint, there should only be one operation.
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)

Expand Down
Loading