Skip to content

Commit

Permalink
Add Multicast route
Browse files Browse the repository at this point in the history
  • Loading branch information
ceclinux committed Dec 3, 2021
1 parent 3c883f2 commit f3544fb
Show file tree
Hide file tree
Showing 10 changed files with 867 additions and 3 deletions.
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,10 @@ func run(o *Options) error {
}

if features.DefaultFeatureGate.Enabled(features.Multicast) {
mcastController := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore)
mcastController, err := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore, o.config.TransportInterface)
if err != nil {
return fmt.Errorf("error creating multicast controller: %s", err.Error())
}
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/controller/noderoute"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/multicast"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/route"
Expand Down Expand Up @@ -185,6 +186,13 @@ func (i *Initializer) setupOVSBridge() error {
return err
}

if features.DefaultFeatureGate.Enabled(features.Multicast) {
err := multicast.SetOVSMulticast(i.ovsBridge)
if err != nil {
return err
}
}

return nil
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/agent/interfacestore/interface_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,17 @@ func (c *interfaceCache) GetInterfacesByType(interfaceType InterfaceType) []*Int
return interfaces
}

func (c *interfaceCache) GetAllInterfaces() []*InterfaceConfig {
c.RLock()
defer c.RUnlock()
objs := c.cache.List()
interfaces := make([]*InterfaceConfig, len(objs))
for i := range objs {
interfaces[i] = objs[i].(*InterfaceConfig)
}
return interfaces
}

func (c *interfaceCache) Len() int {
c.RLock()
defer c.RUnlock()
Expand Down
14 changes: 14 additions & 0 deletions pkg/agent/interfacestore/testing/mock_interfacestore.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/agent/interfacestore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type InterfaceStore interface {
GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig, bool)
GetContainerInterfaceNum() int
GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig
GetAllInterfaces() []*InterfaceConfig
Len() int
GetInterfaceKeysByType(interfaceType InterfaceType) []string
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/agent/multicast/kernel_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multicast

import (
"fmt"
"strconv"
"strings"
"syscall"
)

// The following constants and structs come from the linux kernel file
// linux/include/uapi/linux/mroute.h
const (
IGMPMSG_NOCACHE = 1
VIFF_USE_IFINDEX = 8
MRT_ADD_VIF = 202
MRT_DEL_VIF = 203
MRT_ADD_MFC = 204
MRT_DEL_MFC = 205
MRT_BASE = 200
MRT_TABLE = 209
MRT_FLUSH = 212
MAXVIFS = 32
)

type vifctl struct {
vifcVifi uint16
vifcFlags uint8
vifcThreshold uint8
vifcRateLimit uint32
vifcLclIfindex int
vifcRmtAddr [4]byte
}

type mfcctl struct {
mfccOrigin [4]byte
mfccMcastgrp [4]byte
mfccParent uint16
mfccTtls [32]byte
mfccPktCnt uint32
mfccByteCnt uint32
mfccWrongIf uint32
mfccExpire int
}

// We should consider the changes in vifctl struct after kernel versiopn 5.9
func parseKernelVersion() (*kernelVersion, error) {
var uname syscall.Utsname
if err := syscall.Uname(&uname); err != nil {
return nil, fmt.Errorf("Running uname error: %v", err)
}
kernel_version_full := arrayToString(uname.Release)
kernel_version_numbers := strings.Split(kernel_version_full, "-")
kernel_version_array := strings.Split(kernel_version_numbers[0], ".")
major, err := strconv.Atoi(kernel_version_array[0])
minor, err := strconv.Atoi(kernel_version_array[1])
patch, err := strconv.Atoi(kernel_version_array[2])
if err != nil {
return nil, fmt.Errorf("failed to parse kernel version %s", kernel_version_full)
}
return &kernelVersion{
major: major,
minor: minor,
patch: patch,
}, nil
}

func arrayToString(x [65]int8) string {
var buf [65]byte
for i, b := range x {
buf[i] = byte(b)
}
str := string(buf[:])
if i := strings.Index(str, "\x00"); i != -1 {
str = str[:i]
}
return str
}

type kernelVersion struct {
major int
minor int
patch int
}
42 changes: 40 additions & 2 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package multicast

import (
"fmt"
"net"
"os/exec"
"sync"
"time"

Expand Down Expand Up @@ -164,9 +166,14 @@ type Controller struct {
// installedGroups saves the groups which are configured on both OVS and the host.
installedGroups sets.String
installedGroupsMutex sync.RWMutex
mRouteClient *MRouteClient
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore) *Controller {
func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, transportInterface string) (*Controller, error) {
multicastRouteClient, err := NewRouteClient(nodeConfig, transportInterface, ifaceStore)
if err != nil {
return nil, fmt.Errorf("failed to initialize multicast route client %+v", err)
}
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand All @@ -180,7 +187,8 @@ func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeCon
groupCache: groupCache,
installedGroups: sets.NewString(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"),
}
mRouteClient: multicastRouteClient,
}, nil
}

func (c *Controller) Initialize() error {
Expand Down Expand Up @@ -214,6 +222,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
// Process multicast Group membership report or leave messages.
go wait.Until(c.worker, time.Second, stopCh)
}
go c.mRouteClient.runRouteClient(stopCh)
}

func (c *Controller) worker() {
Expand Down Expand Up @@ -292,6 +301,16 @@ func (c *Controller) syncGroup(groupKey string) error {
klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey)
return err
}
err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group)
if err != nil {
klog.Errorf("Cannot delete multicat group %s: %s", status.group.String(), err.Error())
return err
}
err = c.mRouteClient.externalInterfacesLeaveMgroup(status.group)
if err != nil {
klog.Errorf("Cannot drop multicat group for all the external and tranport interfaces %s: %s", status.group.String(), err.Error())
return err
}
c.delInstalledGroup(groupKey)
c.groupCache.Delete(status)
klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey)
Expand All @@ -302,6 +321,9 @@ func (c *Controller) syncGroup(groupKey string) error {
klog.ErrorS(err, "Failed to install multicast flows", "group", status.group)
return err
}
if err := c.mRouteClient.externalInterfacesJoinMgroup(status.group); err != nil {
return err
}
c.addInstalledGroup(groupKey)
return nil
}
Expand Down Expand Up @@ -360,6 +382,22 @@ func podInterfaceIndexFunc(obj interface{}) ([]string, error) {
return podInterfaces, nil
}

func SetOVSMulticast(ovsBridge string) error {
setSnoopingEnable := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "mcast_snooping_enable=true")
cmd := exec.Command("/bin/sh", "-c", setSnoopingEnable)
err := cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", setSnoopingEnable, err)
}
mcastSnoopingDisableFloodUnregistered := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "other_config:mcast-snooping-disable-flood-unregistered=true")
cmd = exec.Command("/bin/sh", "-c", mcastSnoopingDisableFloodUnregistered)
err = cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", mcastSnoopingDisableFloodUnregistered, err)
}
return nil
}

func getGroupEventKey(obj interface{}) (string, error) {
groupState := obj.(*GroupMemberStatus)
return groupState.group.String(), nil
Expand Down
Loading

0 comments on commit f3544fb

Please sign in to comment.