diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index e20b2813303..ebf2a8e9bee 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -458,7 +458,10 @@ func run(o *Options) error { } if features.DefaultFeatureGate.Enabled(features.Multicast) { - mcastController := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore) + mcastController, err := multicast.NewMulticastController(ofClient, nodeConfig, ifaceStore, o.config.TransportInterface) + if err != nil { + return fmt.Errorf("error creating multicast controller: %s", err.Error()) + } if err := mcastController.Initialize(); err != nil { return err } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index f2455027e10..6417b797535 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -37,6 +37,7 @@ import ( "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/controller/noderoute" "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/multicast" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/route" @@ -185,6 +186,13 @@ func (i *Initializer) setupOVSBridge() error { return err } + if features.DefaultFeatureGate.Enabled(features.Multicast) { + err := multicast.SetOVSMulticast(i.ovsBridge) + if err != nil { + return err + } + } + return nil } diff --git a/pkg/agent/interfacestore/interface_cache.go b/pkg/agent/interfacestore/interface_cache.go index 0f91949fce3..b06c35cdaed 100644 --- a/pkg/agent/interfacestore/interface_cache.go +++ b/pkg/agent/interfacestore/interface_cache.go @@ -169,6 +169,17 @@ func (c *interfaceCache) GetInterfacesByType(interfaceType InterfaceType) []*Int return interfaces } +func (c *interfaceCache) GetAllInterfaces() []*InterfaceConfig { + c.RLock() + defer c.RUnlock() + objs := c.cache.List() + interfaces := make([]*InterfaceConfig, len(objs)) + for i := range objs { + interfaces[i] = objs[i].(*InterfaceConfig) + } + return interfaces +} + func (c *interfaceCache) Len() int { c.RLock() defer c.RUnlock() diff --git a/pkg/agent/interfacestore/testing/mock_interfacestore.go b/pkg/agent/interfacestore/testing/mock_interfacestore.go index 3e8aa50fbc5..e90fbd8b823 100644 --- a/pkg/agent/interfacestore/testing/mock_interfacestore.go +++ b/pkg/agent/interfacestore/testing/mock_interfacestore.go @@ -72,6 +72,20 @@ func (mr *MockInterfaceStoreMockRecorder) DeleteInterface(arg0 interface{}) *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteInterface", reflect.TypeOf((*MockInterfaceStore)(nil).DeleteInterface), arg0) } +// GetAllInterfaces mocks base method +func (m *MockInterfaceStore) GetAllInterfaces() []*interfacestore.InterfaceConfig { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllInterfaces") + ret0, _ := ret[0].([]*interfacestore.InterfaceConfig) + return ret0 +} + +// GetAllInterfaces indicates an expected call of GetAllInterfaces +func (mr *MockInterfaceStoreMockRecorder) GetAllInterfaces() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllInterfaces", reflect.TypeOf((*MockInterfaceStore)(nil).GetAllInterfaces)) +} + // GetContainerInterface mocks base method func (m *MockInterfaceStore) GetContainerInterface(arg0 string) (*interfacestore.InterfaceConfig, bool) { m.ctrl.T.Helper() diff --git a/pkg/agent/interfacestore/types.go b/pkg/agent/interfacestore/types.go index 66b023dcabf..cd204bdb4b4 100644 --- a/pkg/agent/interfacestore/types.go +++ b/pkg/agent/interfacestore/types.go @@ -91,6 +91,7 @@ type InterfaceStore interface { GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig, bool) GetContainerInterfaceNum() int GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig + GetAllInterfaces() []*InterfaceConfig Len() int GetInterfaceKeysByType(interfaceType InterfaceType) []string } diff --git a/pkg/agent/multicast/kernel_source.go b/pkg/agent/multicast/kernel_source.go new file mode 100644 index 00000000000..419590ac09f --- /dev/null +++ b/pkg/agent/multicast/kernel_source.go @@ -0,0 +1,96 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package multicast + +import ( + "fmt" + "strconv" + "strings" + "syscall" +) + +// The following constants and structs come from the linux kernel file +// linux/include/uapi/linux/mroute.h +const ( + IGMPMSG_NOCACHE = 1 + VIFF_USE_IFINDEX = 8 + MRT_ADD_VIF = 202 + MRT_DEL_VIF = 203 + MRT_ADD_MFC = 204 + MRT_DEL_MFC = 205 + MRT_BASE = 200 + MRT_TABLE = 209 + MRT_FLUSH = 212 + MAXVIFS = 32 +) + +type vifctl struct { + vifcVifi uint16 + vifcFlags uint8 + vifcThreshold uint8 + vifcRateLimit uint32 + vifcLclIfindex int + vifcRmtAddr [4]byte +} + +type mfcctl struct { + mfccOrigin [4]byte + mfccMcastgrp [4]byte + mfccParent uint16 + mfccTtls [32]byte + mfccPktCnt uint32 + mfccByteCnt uint32 + mfccWrongIf uint32 + mfccExpire int +} + +// We should consider the changes in vifctl struct after kernel versiopn 5.9 +func parseKernelVersion() (*kernelVersion, error) { + var uname syscall.Utsname + if err := syscall.Uname(&uname); err != nil { + return nil, fmt.Errorf("Running uname error: %v", err) + } + kernel_version_full := arrayToString(uname.Release) + kernel_version_numbers := strings.Split(kernel_version_full, "-") + kernel_version_array := strings.Split(kernel_version_numbers[0], ".") + major, err := strconv.Atoi(kernel_version_array[0]) + minor, err := strconv.Atoi(kernel_version_array[1]) + patch, err := strconv.Atoi(kernel_version_array[2]) + if err != nil { + return nil, fmt.Errorf("failed to parse kernel version %s", kernel_version_full) + } + return &kernelVersion{ + major: major, + minor: minor, + patch: patch, + }, nil +} + +func arrayToString(x [65]int8) string { + var buf [65]byte + for i, b := range x { + buf[i] = byte(b) + } + str := string(buf[:]) + if i := strings.Index(str, "\x00"); i != -1 { + str = str[:i] + } + return str +} + +type kernelVersion struct { + major int + minor int + patch int +} diff --git a/pkg/agent/multicast/mcast_controller.go b/pkg/agent/multicast/mcast_controller.go index 441c878b4a6..e7c6b129900 100644 --- a/pkg/agent/multicast/mcast_controller.go +++ b/pkg/agent/multicast/mcast_controller.go @@ -15,7 +15,9 @@ package multicast import ( + "fmt" "net" + "os/exec" "sync" "time" @@ -164,9 +166,14 @@ type Controller struct { // installedGroups saves the groups which are configured on both OVS and the host. installedGroups sets.String installedGroupsMutex sync.RWMutex + mRouteClient *MRouteClient } -func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore) *Controller { +func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeConfig, ifaceStore interfacestore.InterfaceStore, transportInterface string) (*Controller, error) { + multicastRouteClient, err := NewRouteClient(nodeConfig, transportInterface, ifaceStore) + if err != nil { + return nil, fmt.Errorf("failed to initialize multicast route client %+v", err) + } eventCh := make(chan *mcastGroupEvent, workerCount) groupSnooper := newSnooper(ofClient, ifaceStore, eventCh) groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{ @@ -180,7 +187,8 @@ func NewMulticastController(ofClient openflow.Client, nodeConfig *config.NodeCon groupCache: groupCache, installedGroups: sets.NewString(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "multicastgroup"), - } + mRouteClient: multicastRouteClient, + }, nil } func (c *Controller) Initialize() error { @@ -214,6 +222,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // Process multicast Group membership report or leave messages. go wait.Until(c.worker, time.Second, stopCh) } + go c.mRouteClient.runRouteClient(stopCh) } func (c *Controller) worker() { @@ -292,6 +301,16 @@ func (c *Controller) syncGroup(groupKey string) error { klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey) return err } + err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group) + if err != nil { + klog.Errorf("Cannot delete multicat group %s: %s", status.group.String(), err.Error()) + return err + } + err = c.mRouteClient.externalInterfacesLeaveMgroup(status.group) + if err != nil { + klog.Errorf("Cannot drop multicat group for all the external and tranport interfaces %s: %s", status.group.String(), err.Error()) + return err + } c.delInstalledGroup(groupKey) c.groupCache.Delete(status) klog.InfoS("Removed multicast group from cache after all members left", "group", groupKey) @@ -302,6 +321,9 @@ func (c *Controller) syncGroup(groupKey string) error { klog.ErrorS(err, "Failed to install multicast flows", "group", status.group) return err } + if err := c.mRouteClient.externalInterfacesJoinMgroup(status.group); err != nil { + return err + } c.addInstalledGroup(groupKey) return nil } @@ -360,6 +382,22 @@ func podInterfaceIndexFunc(obj interface{}) ([]string, error) { return podInterfaces, nil } +func SetOVSMulticast(ovsBridge string) error { + setSnoopingEnable := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "mcast_snooping_enable=true") + cmd := exec.Command("/bin/sh", "-c", setSnoopingEnable) + err := cmd.Run() + if err != nil { + return fmt.Errorf("error running %s: %v", setSnoopingEnable, err) + } + mcastSnoopingDisableFloodUnregistered := fmt.Sprintf("ovs-vsctl set Bridge %s %s", ovsBridge, "other_config:mcast-snooping-disable-flood-unregistered=true") + cmd = exec.Command("/bin/sh", "-c", mcastSnoopingDisableFloodUnregistered) + err = cmd.Run() + if err != nil { + return fmt.Errorf("error running %s: %v", mcastSnoopingDisableFloodUnregistered, err) + } + return nil +} + func getGroupEventKey(obj interface{}) (string, error) { groupState := obj.(*GroupMemberStatus) return groupState.group.String(), nil diff --git a/pkg/agent/multicast/multicast_route.go b/pkg/agent/multicast/multicast_route.go new file mode 100644 index 00000000000..cb6c4cb363d --- /dev/null +++ b/pkg/agent/multicast/multicast_route.go @@ -0,0 +1,473 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package multicast + +import ( + "fmt" + "net" + "strconv" + "strings" + "syscall" + "time" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +const ( + GroupNameIndexName = "groupName" + VIFIndexName = "VIFName" + + igmpMsgminRetryDelay = 5 * time.Second + igmpMsgmaxRetryDelay = 300 * time.Second +) + +func NewRouteClient(nodeconfig *config.NodeConfig, transportInterface string, ifaceStore interfacestore.InterfaceStore) (*MRouteClient, error) { + fd, err := createMulticastSocket() + if err != nil { + return nil, fmt.Errorf("failed to create multicast socket") + } + VIFAllocator := newVIFAllocator(fd) + if err != nil { + return nil, fmt.Errorf("failed to create multicast VIF allocator") + } + c := &MRouteClient{ + sockFD: fd, + vifAllocator: VIFAllocator, + gatewayInterface: nodeconfig.GatewayConfig.Name, + ifaceStore: ifaceStore, + transportInterface: transportInterface, + inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc, VIFIndexName: inboundvifIndexFunc}), + outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{GroupNameIndexName: outboundGroupIndexFunc, VIFIndexName: oudboundvifsIndexFunc}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(igmpMsgminRetryDelay, igmpMsgmaxRetryDelay), "multicastroute"), + } + // TODO: Need to find a better way to get external interfaces and get updated map of external interfaces + externalAddrMap, err := c.getExternalMulticastIPs() + if err != nil { + return nil, err + } + c.externalAddrMap = externalAddrMap + _, err = VIFAllocator.setVifIfNotExist(c.gatewayInterface) + if err != nil { + return nil, fmt.Errorf("failed to allocate VIF for gateway") + } + _, err = VIFAllocator.setVifIfNotExist(c.transportInterface) + if err != nil { + return nil, fmt.Errorf("failed to allocate VIF for transportInterface") + } + + return c, nil +} + +// MRouteClient configures static multicast route +type MRouteClient struct { + sockFD int + vifAllocator *VIFAllocator + gatewayInterface string + transportInterface string + inboundRouteCache cache.Indexer + outboundRouteCache cache.Indexer + ifaceStore interfacestore.InterfaceStore + externalAddrMap map[string][]net.IP + queue workqueue.RateLimitingInterface +} + +func createMulticastSocket() (int, error) { + fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_IGMP) + if err != nil { + return -1, fmt.Errorf("cannot create multicast socket") + } + + err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, MRT_BASE, 1) + if err != nil { + return -1, fmt.Errorf("unable to activte Multicast routing in kernel with error %s", err.Error()) + } + return fd, nil +} + +// externalInterfacesJoinMgroup allows external interfaces join multicast group, +// making these interfaces accept mulitcast traffic with multicast ip:mgroup +// https://tldp.org/HOWTO/Multicast-HOWTO-6.html#ss6.4 +func (c *MRouteClient) externalInterfacesJoinMgroup(mgroup net.IP) error { + groupIP := mgroup.To4() + for name, addrArr := range c.externalAddrMap { + for _, addr := range addrArr { + addrIP := addr.To4() + err := syscall.SetsockoptIPMreq(c.sockFD, syscall.IPPROTO_IP, syscall.IP_ADD_MEMBERSHIP, &syscall.IPMreq{ + Multiaddr: [4]byte{groupIP[0], groupIP[1], groupIP[2], groupIP[3]}, + Interface: [4]byte{addrIP[0], addrIP[1], addrIP[2], addrIP[3]}, + }) + if err != nil { + return fmt.Errorf("cannot join multicast group %s for %s: %s", mgroup.String(), name, err.Error()) + } + } + c.vifAllocator.setVifIfNotExist(name) + } + return nil +} + +func (c *MRouteClient) externalInterfacesLeaveMgroup(mgroup net.IP) error { + for _, addrArr := range c.externalAddrMap { + for _, addr := range addrArr { + addrV4 := addr.To4() + groupIPStr := mgroup.To4() + err := syscall.SetsockoptIPMreq(c.sockFD, syscall.IPPROTO_IP, syscall.IP_DROP_MEMBERSHIP, &syscall.IPMreq{ + Multiaddr: [4]byte{groupIPStr[0], groupIPStr[1], groupIPStr[2], groupIPStr[3]}, + Interface: [4]byte{addrV4[0], addrV4[1], addrV4[2], addrV4[3]}, + }) + if err != nil { + return fmt.Errorf("cannot Join multicast group %s: %s", mgroup.String(), err.Error()) + } + } + } + return nil +} + +// parseIGMPMsg parses the kernel version into igmpMsg, note there is a change in +// the after linux 5.9 in the igmpmsg https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e +func (c *MRouteClient) parseIGMPMsg(msg []byte) (*igmpMsg, error) { + kernelVersion, err := parseKernelVersion() + if err != nil { + return nil, err + } + var vif uint16 + if kernelVersion.major >= 5 && kernelVersion.minor > 9 { + vif = uint16(msg[10]) + (uint16(msg[11]) << uint16(8)) + } else { + vif = uint16(msg[10]) + } + return &igmpMsg{ + imMsgtype: msg[8], + VIF: vif, + imSrc: net.IPv4(msg[12], msg[13], msg[14], msg[15]), + imDst: net.IPv4(msg[16], msg[17], msg[18], msg[19]), + }, nil +} + +// processIGMPNocacheMsg reads igmpMsg from the socket and configure +// multicast route based on the message +// the outbound routing entries is for sending multicast traffic from pod +// and the inbound routing entries +func (c *MRouteClient) processIGMPNocacheMsg() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + + if key, ok := obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call Forget here else we'd + // go into a loop of attempting to process a work item that is invalid. + // This should not happen. + c.queue.Forget(obj) + klog.Errorf("Expected string in work queue but got %#v", obj) + return true + } else if err := c.sync(key); err == nil { + // If no error occurs we Forget this item so it does not get queued again until + // another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.Errorf("Error syncing multicast group %s, requeuing. Error: %v", key, err) + } + + return true +} + +func (c *MRouteClient) sync(igmpmsg string) error { + klog.V(2).Infof("igmpmsg received %v", igmpmsg) + bytemsg := []byte(igmpmsg) + if bytemsg[8] != IGMPMSG_NOCACHE { + klog.InfoS("not a IGMPMSG_NOCACHE message: %v", bytemsg) + return nil + } + msg, err := c.parseIGMPMsg(bytemsg) + if err != nil { + klog.Errorf("error parsing kernel version: %s", err.Error()) + return nil + } + gatewayVif, _ := c.vifAllocator.getVif(c.gatewayInterface) + if msg.VIF != uint16(gatewayVif) { + err := c.addInboundMrouteEntry(msg.imSrc, msg.imDst, msg.VIF) + if err != nil { + klog.Errorf("add mroute entry failed :%s", err.Error()) + return err + } + } else { + outboundInterfaces := make([]string, len(c.externalAddrMap)) + i := 0 + for k := range c.externalAddrMap { + outboundInterfaces[i] = k + i++ + } + err = c.addOutBoundMrouteEntry(msg.imSrc, msg.imDst, outboundInterfaces) + if err != nil { + klog.Errorf("add outbound mroute entry failed %s", err.Error()) + return err + } + } + return nil +} + +func (c *MRouteClient) runRouteClient(stopCh <-chan struct{}) { + klog.Infof("Start running multicast routing daemon") + go func() { + // TODO: Find a better way to handle IGMPNO_CACHE MESSAGE + for { + buf := make([]byte, 4096) + n, _ := syscall.Read(c.sockFD, buf) + if n > 0 { + c.queue.Add(string(buf[:n])) + } + } + }() + + for i := 0; i < int(workerCount); i++ { + // Process multicast Group membership report or leave messages. + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh + c.flushMRoute() +} + +func (c *MRouteClient) worker() { + for c.processIGMPNocacheMsg() { + } +} + +func (c *MRouteClient) addMrouteEntry(src net.IP, group net.IP, iif uint16, oifsVif []uint16) (err error) { + mc := &mfcctl{} + origin := src.To4() + mc.mfccOrigin = [4]byte{origin[0], origin[1], origin[2], origin[3]} + g := group.To4() + mc.mfccMcastgrp = [4]byte{g[0], g[1], g[2], g[3]} + ttls := [32]byte{} + for _, v := range oifsVif { + ttls[v] = 1 + } + mc.mfccTtls = ttls + mc.mfccParent = iif + return setsockoptMfcctl(c.sockFD, MRT_ADD_MFC, mc) +} + +func (c *MRouteClient) delMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) { + mc := &mfcctl{} + origin := src.To4() + mc.mfccOrigin = [4]byte{origin[0], origin[1], origin[2], origin[3]} + g := group.To4() + mc.mfccMcastgrp = [4]byte{g[0], g[1], g[2], g[3]} + mc.mfccParent = iif + return setsockoptMfcctl(c.sockFD, MRT_DEL_MFC, mc) +} + +func (c *MRouteClient) flushMRoute() { + klog.Infof("Clearing multicast routing table entry") + setsockoptVifctl(c.sockFD, MRT_FLUSH, &vifctl{}) +} + +func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error) { + klog.V(2).Infof("Deleting multicast group %s", group.String()) + mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String()) + vifs := make(map[uint16]bool) + for _, route := range mEntries { + entry := route.(*inboundMulticastRouteEntry) + err := c.delMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif) + if err != nil { + return err + } + vifs[entry.vif] = true + c.inboundRouteCache.Delete(route) + } + for vif := range vifs { + c.tryDeleteVif(vif) + } + return nil +} + +// tryDeleteVif tries to delete vif by searching if any vif is still used in the +// multicast route. If not, the vif can be safely reclaimed. +func (c *MRouteClient) tryDeleteVif(vif uint16) { + vifStr := strconv.FormatUint(uint64(vif), 10) + inboundObjs, _ := c.inboundRouteCache.ByIndex(VIFIndexName, vifStr) + outboundObjs, _ := c.outboundRouteCache.ByIndex(VIFIndexName, vifStr) + if len(inboundObjs) == 0 && len(outboundObjs) == 0 { + c.vifAllocator.release(vif) + } +} + +// addOutBoundMrouteEntry configure multicast route from antrea gateway to oifs(outbound multicast interfaces) +// to allow multicast senders in the pods send multicast traffic to external +func (c *MRouteClient) addOutBoundMrouteEntry(src net.IP, group net.IP, oifs []string) (err error) { + klog.InfoS("Adding outbound multicast route", "src", src, "group", group, "oifs", oifs) + vifs := make([]uint16, 0) + for _, oif := range oifs { + oifvif, err := c.vifAllocator.setVifIfNotExist(oif) + if err != nil { + return err + } + vifs = append(vifs, uint16(oifvif)) + } + + inboundVif, _ := c.vifAllocator.getVif(c.gatewayInterface) + err = c.addMrouteEntry(src, group, uint16(inboundVif), vifs) + if err != nil { + return err + } + routeEntry := &outboundMulticastRouteEntry{ + group: group.String(), + src: src.String(), + vifs: vifs, + } + _, exist, _ := c.outboundRouteCache.GetByKey(routeEntry.group + "/" + routeEntry.src) + if !exist { + c.outboundRouteCache.Add(routeEntry) + } else { + c.outboundRouteCache.Update(routeEntry) + } + return nil +} + +// addOutBoundMrouteEntry configure multicast route from external interface to antrea gateway +// to allow multicast receivers in the pods receive multicast traffic from external +func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVif uint16) (err error) { + klog.InfoS("Adding inbound multicast route", "src", src, "group", group, "inboundVif", inboundVif) + oif, err := c.vifAllocator.getVif(c.gatewayInterface) + if err != nil { + return err + } + + err = c.addMrouteEntry(src, group, inboundVif, []uint16{oif}) + if err != nil { + return err + } + routeEntry := &inboundMulticastRouteEntry{ + group: group.String(), + src: src.String(), + vif: inboundVif, + } + c.inboundRouteCache.Add(routeEntry) + return nil +} + +// inboundMulticastRouteEntry encodes the inbound multicast routing entry. +// e.g. (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0 +// the oif is alway gatewayInterface so we do not put it in the struct +type inboundMulticastRouteEntry struct { + group string + src string + vif uint16 +} + +// outboundMulticastRouteEntry encodes the outbound multicast routing entry. +// e.g. (10.244.1.4,224.1.2.10) Iif: antrea-gw0 Oifs: wlo1 wlp3s0 zt5u4zjljr +// the oif is alway gatewayInterface so we do not put it in the struct +type outboundMulticastRouteEntry struct { + group string + src string + vifs []uint16 +} + +func getMulticastInboundEntryKey(obj interface{}) (string, error) { + entry := obj.(*inboundMulticastRouteEntry) + return entry.group + "/" + entry.src + "/" + string(entry.vif), nil +} + +func getMulticastOutboundEntryKey(obj interface{}) (string, error) { + entry := obj.(*outboundMulticastRouteEntry) + return entry.group + "/" + entry.src, nil +} + +func inboundvifIndexFunc(obj interface{}) ([]string, error) { + entry, ok := obj.(*inboundMulticastRouteEntry) + if !ok { + return []string{}, nil + } + return []string{strconv.FormatUint(uint64(entry.vif), 10)}, nil +} + +func oudboundvifsIndexFunc(obj interface{}) ([]string, error) { + entry, ok := obj.(*outboundMulticastRouteEntry) + if !ok { + return []string{}, nil + } + vifs := make([]string, len(entry.vifs)) + for _, vif := range entry.vifs { + vifs = append(vifs, strconv.FormatUint(uint64(vif), 10)) + } + return vifs, nil +} + +func inboundGroupIndexFunc(obj interface{}) ([]string, error) { + entry, ok := obj.(*inboundMulticastRouteEntry) + if !ok { + return []string{}, nil + } + return []string{entry.group}, nil +} + +func outboundGroupIndexFunc(obj interface{}) ([]string, error) { + entry, ok := obj.(*outboundMulticastRouteEntry) + if !ok { + return []string{}, nil + } + return []string{entry.group}, nil +} + +// getExternalMulticastIPs tries to get external multicast interfaces for configuring outbound multicast routes +// Currently the implementation is to filter out interfaces from ifaceStore and mulicast disbaled interfacs +func (c *MRouteClient) getExternalMulticastIPs() (map[string][]net.IP, error) { + interfacesMap := make(map[string]bool) + ifaces, err := net.Interfaces() + if err != nil { + return nil, err + } + for _, iface := range c.ifaceStore.GetAllInterfaces() { + interfacesMap[iface.InterfaceName] = true + } + interfacesMap[c.gatewayInterface] = true + addrMap := make(map[string][]net.IP) + + for _, iface := range ifaces { + _, ok := interfacesMap[iface.Name] + // the interface should be multicast enabled + // and not in the ifaceStore + if ok || !strings.Contains(iface.Flags.String(), "multicast") { + continue + } + addrs, err := iface.Addrs() + if err != nil { + return nil, err + } + for _, addr := range addrs { + ip, _, _ := net.ParseCIDR(addr.String()) + ipv4 := ip.To4() + if ipv4 != nil { + ips, ok := addrMap[iface.Name] + if !ok { + addrMap[iface.Name] = []net.IP{ipv4} + } else { + ips = append(ips, ipv4) + addrMap[iface.Name] = ips + } + } + } + } + return addrMap, nil +} diff --git a/pkg/agent/multicast/source_utils.go b/pkg/agent/multicast/source_utils.go new file mode 100644 index 00000000000..dc7d01e87df --- /dev/null +++ b/pkg/agent/multicast/source_utils.go @@ -0,0 +1,87 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// The functions in the file are copied from golang source, maybe find a way to remove it +// +package multicast + +import ( + "net" + "syscall" + "unsafe" +) + +const ( + SizeofVifctl = 16 + SizeofMfcctl = 60 +) + +func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr) (err error) { + _, _, e1 := syscall.Syscall6(syscall.SYS_SETSOCKOPT, uintptr(s), uintptr(level), uintptr(name), uintptr(val), uintptr(vallen), 0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +func setsockoptMfcctl(fd, opt int, filter *mfcctl) error { + return setsockopt(fd, syscall.IPPROTO_IP, opt, unsafe.Pointer(filter), SizeofMfcctl) +} + +func setsockoptVifctl(fd, opt int, filter *vifctl) error { + return setsockopt(fd, syscall.IPPROTO_IP, opt, unsafe.Pointer(filter), SizeofVifctl) +} + +func setVifToInterface(fd int, vif uint16, index int) error { + vc := &vifctl{} + vc.vifcVifi = vif + vc.vifcRateLimit = 0 + vc.vifcFlags = 0 + vc.vifcFlags |= VIFF_USE_IFINDEX + vc.vifcLclIfindex = index + return setsockoptVifctl(fd, MRT_ADD_VIF, vc) +} + +func deleteVifFromInterface(fd int, vif uint16) error { + vc := &vifctl{} + vc.vifcVifi = vif + return setsockoptVifctl(fd, MRT_DEL_VIF, vc) +} + +var ( + errEAGAIN error = syscall.EAGAIN + errEINVAL error = syscall.EINVAL + errENOENT error = syscall.ENOENT +) + +func errnoErr(e syscall.Errno) error { + switch e { + case 0: + return nil + case syscall.EAGAIN: + return errEAGAIN + case syscall.EINVAL: + return errEINVAL + case syscall.ENOENT: + return errENOENT + } + return e +} + +type igmpMsg struct { + imMsgtype uint8 + VIF uint16 + imSrc net.IP + imDst net.IP +} diff --git a/pkg/agent/multicast/vif_allocator.go b/pkg/agent/multicast/vif_allocator.go new file mode 100644 index 00000000000..c51f6ba21aa --- /dev/null +++ b/pkg/agent/multicast/vif_allocator.go @@ -0,0 +1,133 @@ +// Copyright 2021 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicast + +import ( + "container/list" + "fmt" + "net" + "sync" + + "github.com/vishvananda/netlink" + "k8s.io/klog/v2" +) + +// VIF stands for virtual interface for multicast in kernel multicast implementation +// and is used for adding/deleting static multicast entries, please check +// https://github.com/torvalds/linux/blob/master/include/uapi/linux/mroute.h#L80 +// VIFAllocator is used for saving vif <-> interface mapping. +// It also allocates and deallocates VIFs because VIFs are limited resource(at most 32) +// for kernel prior to 5.10 +type VIFAllocator struct { + mutex sync.RWMutex + vifMap map[string]uint16 + vifReverseMap map[uint16]string + nextID uint16 + maxID uint16 + sockFD int + availableIDs *list.List +} + +func newVIFAllocator(sockFD int) *VIFAllocator { + return &VIFAllocator{ + availableIDs: list.New(), + vifMap: make(map[string]uint16), + vifReverseMap: make(map[uint16]string), + nextID: 0, + maxID: 255, + sockFD: sockFD, + } +} + +func (v *VIFAllocator) getVif(name string) (uint16, error) { + v.mutex.RLock() + defer v.mutex.RUnlock() + vif, ok := v.vifMap[name] + if !ok { + return 0, fmt.Errorf("VIF for interface %s not found", name) + } + return vif, nil +} + +func (v *VIFAllocator) allocateVif(name string) (uint16, error) { + l, err := netlink.LinkList() + if err != nil { + return 0, err + } else { + for _, link := range l { + if link.Attrs().Flags > net.FlagMulticast { + if link.Attrs().Name == name { + vif, err := v.allocate(link.Attrs().Index) + setVifToInterface(v.sockFD, vif, link.Attrs().Index) + if err != nil { + return 0, err + } else { + klog.Infof("Successfully add VIF: %d", vif) + v.vifMap[name] = vif + v.vifReverseMap[vif] = name + return vif, nil + } + } + } + } + } + return 0, fmt.Errorf("error adding VIF for %s failed: cannot find this interface", name) +} + +func (v *VIFAllocator) setVifIfNotExist(name string) (uint16, error) { + klog.V(2).Infof("Get and set VIF %s", name) + v.mutex.Lock() + defer v.mutex.Unlock() + vif, exist := v.vifMap[name] + if !exist { + newvif, err := v.allocateVif(name) + if err != nil { + return 0, err + } + return newvif, nil + } else { + return vif, nil + } +} + +func (v *VIFAllocator) allocate(ifIdx int) (uint16, error) { + klog.V(2).Infof("Allocating VIF for ifIdx %d", ifIdx) + front := v.availableIDs.Front() + if front != nil { + return v.availableIDs.Remove(front).(uint16), nil + } + if v.nextID <= v.maxID { + allocated := v.nextID + v.nextID += 1 + return allocated, nil + } + return 0, fmt.Errorf("no ID available") + +} + +func (v *VIFAllocator) release(vif uint16) (err error) { + klog.V(2).Infof("Release vif %d", vif) + v.mutex.Lock() + defer v.mutex.Unlock() + + name, has := v.vifReverseMap[vif] + if !has { + return fmt.Errorf("VIF for %s does not exist", name) + } + v.availableIDs.PushBack(vif) + delete(v.vifMap, name) + delete(v.vifReverseMap, vif) + return deleteVifFromInterface(v.sockFD, vif) +}