Skip to content

Commit

Permalink
Add Multicast route
Browse files Browse the repository at this point in the history
  • Loading branch information
ceclinux committed Nov 8, 2021
1 parent 30bfcdc commit 43ce322
Show file tree
Hide file tree
Showing 7 changed files with 733 additions and 3 deletions.
9 changes: 8 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,14 @@ func run(o *Options) error {
}

if features.DefaultFeatureGate.Enabled(features.Multicast) {
mcastController := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore)
err := multicast.SetOvsMulticast(o.config.OVSBridge)
if err != nil {
return err
}
mcastController, err := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore, o.config.HostGateway, o.config.TransportInterface)
if err != nil {
return fmt.Errorf("error creating multicast controller: %v", err)
}
go mcastController.Run(stopCh)
}

Expand Down
96 changes: 96 additions & 0 deletions pkg/agent/multicast/kernel_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package multicast

import (
"fmt"
"strconv"
"strings"
"syscall"
)

// The following constants and structs come from the linux kernel file
// linux/include/uapi/linux/mroute.h
const (
IGMPMSG_NOCACHE = 1
VIFF_USE_IFINDEX = 8
MRT_ADD_VIF = 202
MRT_DEL_VIF = 203
MRT_ADD_MFC = 204
MRT_DEL_MFC = 205
MRT_BASE = 200
MRT_TABLE = 209
MRT_FLUSH = 212
MAXVIFS = 32
)

type vifctl struct {
vifcVifi uint16
vifcFlags uint8
vifcThreshold uint8
vifcRateLimit uint32
vifcLclIfindex int
vifcRmtAddr [4]byte
}

type mfcctl struct {
mfccOrigin [4]byte
mfccMcastgrp [4]byte
mfccParent uint16
mfccTtls [32]byte
mfccPktCnt uint32
mfccByteCnt uint32
mfccWrongIf uint32
mfccExpire int
}

// We should consider the changes in vifctl struct after kernel versiopn 5.9
func parseKernelVersion() (*kernelVersion, error) {
var uname syscall.Utsname
if err := syscall.Uname(&uname); err != nil {
return nil, fmt.Errorf("Running uname error: %v", err)
}
kernel_version_full := arrayToString(uname.Release)
kernel_version_numbers := strings.Split(kernel_version_full, "-")
kernel_version_array := strings.Split(kernel_version_numbers[0], ".")
major, err := strconv.Atoi(kernel_version_array[0])
minor, err := strconv.Atoi(kernel_version_array[1])
patch, err := strconv.Atoi(kernel_version_array[2])
if err != nil {
return nil, fmt.Errorf("failed to parse kernel version %s", kernel_version_full)
}
return &kernelVersion{
major: major,
minor: minor,
patch: patch,
}, nil
}

func arrayToString(x [65]int8) string {
var buf [65]byte
for i, b := range x {
buf[i] = byte(b)
}
str := string(buf[:])
if i := strings.Index(str, "\x00"); i != -1 {
str = str[:i]
}
return str
}

type kernelVersion struct {
major int
minor int
patch int
}
52 changes: 51 additions & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package multicast

import (
"context"
"fmt"
"net"
"os/exec"
"sync"
"time"

Expand Down Expand Up @@ -64,6 +66,7 @@ type Controller struct {
mutex sync.RWMutex
ctx context.Context
cancelFunc context.CancelFunc
mRouteClient *MRouteClient
}

func (m *Controller) Run(stopCh <-chan struct{}) {
Expand All @@ -82,6 +85,7 @@ func (m *Controller) Run(stopCh <-chan struct{}) {
go m.processGroupEvent(stopCh)
}

go m.mRouteClient.runRouteClient(stopCh)
}

func (m *Controller) processGroupEvent(stopCh <-chan struct{}) {
Expand Down Expand Up @@ -121,6 +125,14 @@ func (m *Controller) addGroupMemberStatus(e mcastGroupEvent) error {
lastProbe: e.time,
localMembers: map[string]bool{e.iface.InterfaceName: true},
}
err := m.mRouteClient.TransportInterfaceJoinMgroup(e.group)
if err != nil {
return err
}
err = m.mRouteClient.AddMrouteEntryByGroup(e.iface.IPs[0], e.group)
if err != nil {
return err
}
m.groupCache.Add(status)
klog.InfoS("Add new Multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName)
return nil
Expand All @@ -141,6 +153,10 @@ func (m *Controller) updateGroupMemberStatus(obj interface{}, e mcastGroupEvent)
status.mutex.Lock()
status.lastProbe = e.time
if !exist {
err := m.mRouteClient.AddMrouteEntryByGroup(e.iface.IPs[0], e.group)
if err != nil {
return err
}
// TODO: add (Pod_IP, e.group) in the Multicast Routing entry whose inbound interface is antrea-gw0
status.localMembers[e.iface.InterfaceName] = true
klog.InfoS("Add interface to Multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
Expand All @@ -154,6 +170,14 @@ func (m *Controller) updateGroupMemberStatus(obj interface{}, e mcastGroupEvent)
// TODO: remove (Pod_IP, e.group) in the Multicast Routing entry whose inbound interface is antrea-gw0
status.mutex.Lock()
delete(status.localMembers, e.iface.InterfaceName)
err := m.mRouteClient.TransportInterfaceLeaveMgroup(e.group)
if err != nil {
return err
}
err = m.mRouteClient.DeleteMrouteEntryByGroup(e.iface.IPs[0], e.group)
if err != nil {
return err
}
status.mutex.Unlock()
klog.InfoS("Member left the Multicast group", "group", e.group.String(), "member", e.iface.InterfaceName)
if len(status.localMembers) == 0 {
Expand Down Expand Up @@ -185,6 +209,11 @@ func (m *Controller) checkLastMember(group net.IP) {
// TODO: Remove group from Multicast routing entries.
m.mutex.Lock()
m.groupCache.Delete(status)
err := m.mRouteClient.DeleteGroup(group)
if err != nil {
klog.Errorf("Cannot delete multicast group:%v", err)
return
}
m.mutex.Unlock()
klog.InfoS("Remove Multicast group from cache after the last member left", "group", group.String())
}
Expand Down Expand Up @@ -213,7 +242,11 @@ func (m *Controller) clearStaleGroups() {
}
}

func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore) *Controller {
func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, gateway string, transportInterface string) (*Controller, error) {
multicastRouteClient, err := NewRouteClient(nodeConfig, gateway, transportInterface)
if err != nil {
return nil, fmt.Errorf("failed to initialize multicast route client %+v", err)
}
eventCh := make(chan mcastGroupEvent)
groupDetector := newDiscovery(ofClient, ifaceStore, eventCh)
groupCache := cache.NewIndexer(getGroupEventKey, nil)
Expand All @@ -226,7 +259,24 @@ func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeCon
groupCache: groupCache,
ctx: ctx,
cancelFunc: cancelFunc,
mRouteClient: multicastRouteClient,
}, nil
}

func SetOvsMulticast(ovsBridge string) error {
setSnoopingEnable := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "mcast_snooping_enable=true")
cmd := exec.Command("/bin/sh", "-c", setSnoopingEnable)
err := cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", setSnoopingEnable, err)
}
mcastSnoopingDisableFloodUnregistered := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "other_config:mcast-snooping-disable-flood-unregistered=true")
cmd = exec.Command("/bin/sh", "-c", mcastSnoopingDisableFloodUnregistered)
err = cmd.Run()
if err != nil {
return fmt.Errorf("error running %s: %v", mcastSnoopingDisableFloodUnregistered, err)
}
return nil
}

func getGroupEventKey(obj interface{}) (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/multicast/mcast_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
)

const (
queryIntervalSeconds = 125
queryIntervalSeconds = 1
mcastTimeout = 3 * queryIntervalSeconds
queryDstMac = "01:00:5e:00:00:01"
mcastAllhostIPv4 = "224.0.0.1"
Expand Down
Loading

0 comments on commit 43ce322

Please sign in to comment.