Skip to content

Commit

Permalink
[Multicast] Use group as flow actions for multicast traffic
Browse files Browse the repository at this point in the history
1. Add local Pod receivers into an OpenFlow type "all" group for each
multicast group, and use such groups in the flow actions. Remove a Pod
from group buckets if the Pod has left the multicast group or is
deleted before leaving the multicast group.
2. Improve multicast e2e tests.

Signed-off-by: wenyingd <[email protected]>
Co-authored-by: Ruochen Shen <[email protected]>
  • Loading branch information
wenyingd and ceclinux committed Apr 26, 2022
1 parent be1116e commit 87629c0
Show file tree
Hide file tree
Showing 26 changed files with 452 additions and 160 deletions.
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,13 @@ func run(o *Options) error {
}
mcastController := multicast.NewMulticastController(
ofClient,
v4GroupIDAllocator,
nodeConfig,
ifaceStore,
multicastSocket,
sets.NewString(append(o.config.MulticastInterfaces, nodeConfig.NodeTransportInterfaceName)...),
ovsBridgeClient)
ovsBridgeClient,
podUpdateChannel)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ func (pc *podConfigurator) removeInterfaces(containerID string) error {
if err := pc.routeClient.DeleteLocalAntreaFlexibleIPAMPodRule(containerConfig.IPs); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -495,7 +496,7 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta
// Add containerConfig into local cache
pc.ifaceStore.AddInterface(containerConfig)
// Notify the Pod update event to required components.
pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName))
pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName), channel.EventAdd, containerID)
return nil
}

Expand All @@ -518,6 +519,7 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
}
// Remove container configuration from cache.
pc.ifaceStore.DeleteInterface(containerConfig)
pc.podUpdateNotifier.Notify(k8s.NamespacedName(containerConfig.PodNamespace, containerConfig.PodName), channel.EventDel, containerID)
klog.Infof("Removed interfaces for container %s", containerID)
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

Expand All @@ -49,7 +50,7 @@ func (pc *podConfigurator) connectInterfaceToOVSAsync(ifConfig *interfacestore.I
// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
// Notify the Pod update event to required components.
pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName))
pc.podUpdateNotifier.Notify(k8s.NamespacedName(ifConfig.PodNamespace, ifConfig.PodName), channel.EventAdd, containerID)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func NewEgressController(

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(pod string) {
func (c *EgressController) processPodUpdate(pod string, evType channel.EventType, options ...string) {
c.egressBindingsMutex.Lock()
defer c.egressBindingsMutex.Unlock()
binding, exists := c.egressBindings[pod]
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func TestPodUpdateShouldSyncEgress(t *testing.T) {
c.mockIPAssigner.EXPECT().UnassignIP(fakeLocalEgressIP1)
// Mock CNIServer
addPodInterface(c.ifaceStore, "ns1", "pendingPod", 10)
c.podUpdateChannel.Notify("ns1/pendingPod")
c.podUpdateChannel.Notify("ns1/pendingPod", channel.EventAdd)
require.NoError(t, wait.PollImmediate(10*time.Millisecond, time.Second, func() (done bool, err error) {
return c.queue.Len() == 1, nil
}))
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func newRuleCache(dirtyRuleHandler func(string), podUpdateSubscriber channel.Sub
// done if antrea-controller has computed the Pods' policies and propagated
// them to this Node by their labels and NodeName, instead of waiting for their
// IPs are reported to kube-apiserver and processed by antrea-controller.
func (c *ruleCache) processPodUpdate(pod string) {
func (c *ruleCache) processPodUpdate(pod string, evType channel.EventType, options ...string) {
namespace, name := k8s.SplitNamespacedName(pod)
member := &v1beta.GroupMember{
Pod: &v1beta.PodReference{
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ func TestRuleCacheProcessPodUpdates(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go podUpdateNotifier.Run(stopCh)
podUpdateNotifier.Notify(tt.podUpdate)
podUpdateNotifier.Notify(tt.podUpdate, channel.EventAdd)
func() {
// Drain the channel with 10 ms timeout so we can know it's done.
for {
Expand Down
168 changes: 134 additions & 34 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
)

type eventType uint8
Expand Down Expand Up @@ -59,10 +63,12 @@ type mcastGroupEvent struct {

type GroupMemberStatus struct {
group net.IP
// localMembers is a set for the local Pod's interface name which has joined in the multicast group.
localMembers sets.String
// localMembers is a map for the local Pod member and its last update time, key is the Pod's interface name,
// and value is its last update time.
localMembers map[string]time.Time
lastIGMPReport time.Time
mutex sync.RWMutex
ofGroupID binding.GroupIDType
}

// eventHandler process the multicast Group membership report or leave messages.
Expand All @@ -82,7 +88,8 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) {
status := &GroupMemberStatus{
group: e.group,
lastIGMPReport: e.time,
localMembers: sets.NewString(e.iface.InterfaceName),
localMembers: map[string]time.Time{e.iface.InterfaceName: e.time},
ofGroupID: c.v4GroupAllocator.Allocate(),
}
c.groupCache.Add(status)
c.queue.Add(e.group.String())
Expand All @@ -101,25 +108,35 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
defer status.mutex.Unlock()
newStatus := &GroupMemberStatus{
group: status.group,
localMembers: status.localMembers.Union(nil),
localMembers: make(map[string]time.Time),
lastIGMPReport: status.lastIGMPReport,
ofGroupID: status.ofGroupID,
}
exist := status.localMembers.Has(e.iface.InterfaceName)
for m, t := range status.localMembers {
newStatus.localMembers[m] = t
}
_, exist := status.localMembers[e.iface.InterfaceName]
switch e.eType {
case groupJoin:
newStatus.lastIGMPReport = e.time
newStatus.localMembers[e.iface.InterfaceName] = e.time
c.groupCache.Update(newStatus)
if !exist {
newStatus.localMembers.Insert(e.iface.InterfaceName)
klog.InfoS("Added member to multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
c.queue.Add(newStatus.group.String())
}
c.groupCache.Update(newStatus)
c.queue.Add(newStatus.group.String())
case groupLeave:
if exist {
newStatus.localMembers.Delete(e.iface.InterfaceName)
delete(newStatus.localMembers, e.iface.InterfaceName)
c.groupCache.Update(newStatus)
klog.InfoS("Deleted member from multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
if len(newStatus.localMembers) == 0 {
_, found := c.ifaceStore.GetInterfaceByName(e.iface.InterfaceName)
// Notify worker immediately about the member leave event if the member doesn't exist on the Node, or there are
// other local members in the multicast group.
if !found || len(newStatus.localMembers) > 0 {
c.queue.Add(newStatus.group.String())
} else {
// Check if all local members have left the multicast group.
klog.InfoS("Check last member in multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
c.checkLastMember(e.group)
}
Expand All @@ -139,53 +156,109 @@ func (c *Controller) checkLastMember(group net.IP) {
c.queue.AddAfter(group.String(), igmpMaxResponseTime)
}

// clearStaleGroups checks the stale groups which have not been updated for mcastGroupTimeout, and then notifies worker
// clearStaleGroups checks the stale group members which have not been updated for mcastGroupTimeout, and then notifies worker
// to remove them from groupCache.
func (c *Controller) clearStaleGroups() {
now := time.Now()
for _, obj := range c.groupCache.List() {
status := obj.(*GroupMemberStatus)
status.mutex.RLock()
diff := now.Sub(status.lastIGMPReport)
status.mutex.RUnlock()
if diff > mcastGroupTimeout {
// Notify worker to remove the group from groupCache if all its members are not updated before mcastGroupTimeout.
c.queue.Add(status.group.String())
} else {
// Create a "leave" event for a local member if it is not updated before mcastGroupTimeout.
for member, lastUpdate := range status.localMembers {
if now.Sub(lastUpdate) > mcastGroupTimeout {
ifConfig := &interfacestore.InterfaceConfig{
InterfaceName: member,
}
event := &mcastGroupEvent{
group: status.group,
eType: groupLeave,
time: now,
iface: ifConfig,
}
c.groupEventCh <- event
}
}
}
status.mutex.RUnlock()
}
}

// removeLocalInterface searches the GroupMemberStatus which the deleted interface has joined, and then triggers a member
// leave event so that Antrea can remove the corresponding interface from local multicast receivers on OVS. This function
// should be called if the removed Pod receiver fails to send IGMP leave message before deletion.
func (c *Controller) removeLocalInterface(pod string, evType channel.EventType, options ...string) {
if evType != channel.EventDel {
return
}
namespace, name := k8s.SplitNamespacedName(pod)
containerID := options[0]
interfaceName := util.GenerateContainerInterfaceName(name, namespace, containerID)
ifConfig := &interfacestore.InterfaceConfig{
InterfaceName: interfaceName,
}
groupStatuses := c.getGroupMemberStatusesByPod(interfaceName)
for _, g := range groupStatuses {
event := &mcastGroupEvent{
group: g.group,
eType: groupLeave,
time: time.Now(),
iface: ifConfig,
}
c.groupEventCh <- event
}
}

type Controller struct {
ofClient openflow.Client
nodeConfig *config.NodeConfig
igmpSnooper *IGMPSnooper
groupEventCh chan *mcastGroupEvent
groupCache cache.Indexer
queue workqueue.RateLimitingInterface
ofClient openflow.Client
v4GroupAllocator openflow.GroupAllocator
ifaceStore interfacestore.InterfaceStore
nodeConfig *config.NodeConfig
igmpSnooper *IGMPSnooper
groupEventCh chan *mcastGroupEvent
groupCache cache.Indexer
queue workqueue.RateLimitingInterface
// installedGroups saves the groups which are configured on both OVS and the host.
installedGroups sets.String
installedGroupsMutex sync.RWMutex
mRouteClient *MRouteClient
ovsBridgeClient ovsconfig.OVSBridgeClient
podDeletionCh chan *interfacestore.InterfaceConfig
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, multicastSocket RouteInterface, multicastInterfaces sets.String, ovsBridgeClient ovsconfig.OVSBridgeClient) *Controller {
func NewMulticastController(ofClient openflow.Client,
v4GroupAllocator openflow.GroupAllocator,
nodeConfig *config.NodeConfig,
ifaceStore interfacestore.InterfaceStore,
multicastSocket RouteInterface,
multicastInterfaces sets.String,
ovsBridgeClient ovsconfig.OVSBridgeClient,
podUpdateSubscriber channel.Subscriber) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
podInterfaceIndex: podInterfaceIndexFunc,
})
multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces)
return &Controller{
ofClient: ofClient,
nodeConfig: nodeConfig,
igmpSnooper: groupSnooper,
groupEventCh: eventCh,
groupCache: groupCache,
installedGroups: sets.NewString(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"),
mRouteClient: multicastRouteClient,
ovsBridgeClient: ovsBridgeClient,
c := &Controller{
ofClient: ofClient,
ifaceStore: ifaceStore,
v4GroupAllocator: v4GroupAllocator,
nodeConfig: nodeConfig,
igmpSnooper: groupSnooper,
groupEventCh: eventCh,
groupCache: groupCache,
installedGroups: sets.NewString(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"),
mRouteClient: multicastRouteClient,
ovsBridgeClient: ovsBridgeClient,
}
podUpdateSubscriber.Subscribe(c.removeLocalInterface)
return c
}

func (c *Controller) Initialize() error {
Expand Down Expand Up @@ -221,7 +294,6 @@ func (c *Controller) Run(stopCh <-chan struct{}) {

// Periodically check the group member status, and remove the groups in which no members exist
go wait.NonSlidingUntil(c.clearStaleGroups, queryInterval, stopCh)

go c.eventHandler(stopCh)

for i := 0; i < int(workerCount); i++ {
Expand Down Expand Up @@ -298,15 +370,30 @@ func (c *Controller) syncGroup(groupKey string) error {
return nil
}
status := obj.(*GroupMemberStatus)
memberPorts := make([]uint32, 0)
status.mutex.Lock()
defer status.mutex.Unlock()
for memberInterfaceName := range status.localMembers {
obj, found := c.ifaceStore.GetInterfaceByName(memberInterfaceName)
if !found {
klog.InfoS("Failed to find interface from cache", "interface", memberInterfaceName)
continue
}
memberPorts = append(memberPorts, uint32(obj.OFPort))
}
if c.groupHasInstalled(groupKey) {
status.mutex.Lock()
defer status.mutex.Unlock()
if c.groupIsStale(status) {
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallMulticastFlow(status.group); err != nil {
if err := c.ofClient.UninstallMulticastFlows(status.group); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey)
return err
}
// Remove the multicast flow entry if no local Pod is in the group.
if err := c.ofClient.UninstallGroup(status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to uninstall multicast group", "group", groupKey)
return err
}
c.v4GroupAllocator.Release(status.ofGroupID)
err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group)
if err != nil {
klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey)
Expand All @@ -320,10 +407,22 @@ func (c *Controller) syncGroup(groupKey string) error {
c.delInstalledGroup(groupKey)
c.groupCache.Delete(status)
klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey)
return nil
}
// Reinstall OpenFlow group because the local Pod receivers have changed.
if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil {
return err
}
klog.V(2).InfoS("Updated OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts)
return nil
}
if err := c.ofClient.InstallMulticastFlow(status.group); err != nil {
// Install OpenFlow group for a new multicast group which has local Pod receivers joined.
if err := c.ofClient.InstallMulticastGroup(status.ofGroupID, memberPorts); err != nil {
return err
}
klog.V(2).InfoS("Installed OpenFlow group for local receivers", "group", groupKey, "ofGroup", status.ofGroupID, "localReceivers", memberPorts)
// Install OpenFlow flow to forward packets to local Pod receivers which are included in the group.
if err := c.ofClient.InstallMulticastFlows(status.group, status.ofGroupID); err != nil {
klog.ErrorS(err, "Failed to install multicast flows", "group", status.group)
return err
}
Expand Down Expand Up @@ -377,6 +476,7 @@ func (c *Controller) addOrUpdateGroupEvent(e *mcastGroupEvent) {
c.updateGroupMemberStatus(obj, e)
}
}
return
}

func podInterfaceIndexFunc(obj interface{}) ([]string, error) {
Expand Down
Loading

0 comments on commit 87629c0

Please sign in to comment.