diff --git a/README.md b/README.md index d5d6afa..9b3e845 100644 --- a/README.md +++ b/README.md @@ -30,12 +30,16 @@ Fluvia Exporter is licensed under the [MIT license](https://en.wikipedia.org/wik For the full license text, see [LICENSE](https://github.com/nttcom/fluvia/blob/master/LICENSE). ## Miscellaneous -Fluvia Exporter supports the following IETF Internet-Drafts: -- [Export of Segment Routing over IPv6 Information in IP Flow Information Export (IPFIX)](https://datatracker.ietf.org/doc/html/draft-ietf-opsawg-ipfix-srv6-srh-14) - - IPFIX Library: Supports all IEs. - - IPFIX Exporter: Implemented the following IEs. +Fluvia Exporter supports the following IEs: + - packetDeltaCount + - [draft-ietf-opsawg-ipfix-srv6-srh](https://datatracker.ietf.org/doc/draft-ietf-opsawg-ipfix-srv6-srh/) - srhActiveSegmentIPv6 - srhSegmentsIPv6Left - srhFlagsIPv6 - srhTagIPv6 - srhSegmentIPv6BasicList + - [draft-ietf-opsawg-ipfix-on-path-telemetry](https://datatracker.ietf.org/doc/draft-ietf-opsawg-ipfix-on-path-telemetry/) + - PathDelayMeanDeltaMicroseconds + - PathDelayMaxDeltaMicroseconds + - PathDelayMinDeltaMicroseconds + - PathDelaySumDeltaMicroseconds diff --git a/cmd/fluvia/main.go b/cmd/fluvia/main.go index 1541761..f0851bd 100644 --- a/cmd/fluvia/main.go +++ b/cmd/fluvia/main.go @@ -51,5 +51,10 @@ func main() { ingressIfName = c.Ipfix.IngressInterface } - client.New(ingressIfName, raddr) + interval := c.Ipfix.Interval + if interval <= 0 { + interval = 1 + } + + client.New(ingressIfName, raddr, interval) } diff --git a/docs/sources/getting-started.md b/docs/sources/getting-started.md index 9af472f..551a566 100644 --- a/docs/sources/getting-started.md +++ b/docs/sources/getting-started.md @@ -19,8 +19,11 @@ ipfix: address: 192.0.2.1 port: 4739 ingress-interface: ens192 + interval: 1 ``` +interval is the intervals between exports (seconds) and the default is 1 second. + ### Run Fluvia Exporter using the fluvia command Start the fluvia command. Specify the created configuration file with the -f option. diff --git a/go.mod b/go.mod index c3d0b70..e390a5b 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,9 @@ go 1.20 require ( github.com/cilium/ebpf v0.11.0 github.com/google/gopacket v1.1.19 - github.com/pkg/errors v0.9.1 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 + golang.org/x/sys v0.10.0 gopkg.in/yaml.v3 v3.0.1 ) -require ( - golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect - golang.org/x/sys v0.10.0 // indirect -) +require golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 // indirect diff --git a/go.sum b/go.sum index e5f5bf2..18d73c7 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= -github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -17,6 +15,7 @@ golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPI golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/config/config.go b/internal/config/config.go index 69de831..de82e21 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,6 +15,7 @@ type Ipfix struct { Address string `yaml:"address"` Port string `yaml:"port"` IngressInterface string `yaml:"ingress-interface"` + Interval int `yaml:"interval"` } type Config struct { diff --git a/internal/pkg/meter/hbh.go b/internal/pkg/meter/hbh.go new file mode 100644 index 0000000..f31ab5b --- /dev/null +++ b/internal/pkg/meter/hbh.go @@ -0,0 +1,212 @@ +package meter + +import ( + "encoding/binary" + "fmt" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +const IPV6_TLV_PAD1 = 0 + +type HBHLayer struct { + layers.BaseLayer + NextHeader uint8 + Length uint8 + Options []IoamOption +} + +type IoamOption struct { + Type uint8 + Length uint8 + Reserved uint8 + OptionType uint8 + TraceHeader IoamTrace +} + +type IoamTrace struct { + NameSpaceId uint16 + NodeLen uint8 + Flags byte + RemainingLen uint8 + Type [3]byte + Reserved byte + NodeDataList []NodeData +} + +type NodeData struct { + HopLimitNodeId [4]byte + IngressEgressIds [4]byte + Second [4]byte + Subsecond [4]byte +} + +var HBHLayerType = gopacket.RegisterLayerType( + 2002, + gopacket.LayerTypeMetadata{ + Name: "HBHLayerType", + Decoder: gopacket.DecodeFunc(decodeHBHLayer), + }, +) + +func (l *HBHLayer) LayerType() gopacket.LayerType { + return HBHLayerType +} + +func (l *HBHLayer) DecodeFromBytes(data []byte, df gopacket.DecodeFeedback) error { + p := 0 + + // Min length of each header + // HBHLayer = 2, IoamOption = 3, IoamTrae = 8 + if len(data) < 2+3+8 { + df.SetTruncated() + return fmt.Errorf("HBH layer less than 2 bytes for HBH packet") + } + + l.NextHeader = data[p] + p++ + l.Length = data[p] + p++ + + optionIdx := 0 + for { + if data[p] != IPV6_TLV_PAD1 { + break + } + + l.Options[optionIdx].Type = IPV6_TLV_PAD1 + optionIdx = optionIdx + 1 + p = p + 1 + } + + ioamOption := l.Options[optionIdx] + + ioamOption.Type = data[p] + p++ + ioamOption.Length = data[p] + p++ + ioamOption.Reserved = data[p] + p++ + ioamOption.OptionType = data[p] + p++ + + trace := ioamOption.TraceHeader + trace.NameSpaceId = binary.BigEndian.Uint16(data[p : p+2]) + p = p + 2 + trace.NodeLen = data[p] >> 3 + trace.Flags = ((data[p] & 0b00000111) << 1) | (data[p+1] >> 7) + p = p + 1 + trace.RemainingLen = data[p] & 0b01111111 + p = p + 1 + copy(trace.Type[:], data[p:p+3]) + p = p + 3 + trace.Reserved = data[p] + p++ + + traceDataLen := ioamOption.Length - (2 + 8) + for i := 0; i < int(traceDataLen)/4/int(trace.NodeLen); i++ { + var ( + hopLimitNodeId [4]byte + ingressEgressIds [4]byte + second [4]byte + subsecond [4]byte + ) + + copy(hopLimitNodeId[:], data[p+16*i:p+16*i+4]) + copy(ingressEgressIds[:], data[p+16*i+4:p+16*i+8]) + copy(second[:], data[p+16*i+8:p+16*i+12]) + copy(subsecond[:], data[p+16*i+12:p+16*i+16]) + + nodeData := NodeData{ + HopLimitNodeId: hopLimitNodeId, + IngressEgressIds: ingressEgressIds, + Second: second, + Subsecond: subsecond, + } + + trace.NodeDataList = append(trace.NodeDataList, nodeData) + } + + return nil +} + +func (l *HBHLayer) SerializeTo(b gopacket.SerializeBuffer, opts gopacket.SerializeOptions) error { + length := l.Length*8 + 8 + bytes, err := b.PrependBytes(int(length)) + if err != nil { + return err + } + + p := 0 + + bytes[p] = l.NextHeader + p++ + bytes[p] = l.Length + p++ + + optionIdx := 0 + for i, option := range l.Options { + if option.Type != IPV6_TLV_PAD1 { + optionIdx = i + break + } + + bytes[p] = IPV6_TLV_PAD1 + p++ + } + + ioamOption := l.Options[optionIdx] + bytes[p] = ioamOption.Type + p++ + bytes[p] = ioamOption.Length + p++ + bytes[p] = ioamOption.Reserved + p++ + bytes[p] = ioamOption.OptionType + p++ + + traceOption := ioamOption.TraceHeader + binary.BigEndian.PutUint16(bytes[p:], traceOption.NameSpaceId) + p = p + 2 + bytes[p] = traceOption.NodeLen << 3 + bytes[p] = (bytes[p] & 0xf8) | ((traceOption.Flags >> 1) & 0x07) + p++ + bytes[p] = (traceOption.Flags & 0x01) << 7 + bytes[p] = (bytes[p] & 0x80) | (traceOption.RemainingLen & 0x7f) + p++ + copy(bytes[p:p+3], traceOption.Type[:]) + p = p + 3 + bytes[p] = traceOption.Reserved + p++ + + traceDataLen := ioamOption.Length - (2 + 8) + for i := 0; i < int(traceDataLen)/4/int(traceOption.NodeLen); i++ { + nodeData := traceOption.NodeDataList[i] + copy(bytes[p+16*i:p+16*i+4], nodeData.HopLimitNodeId[:]) + copy(bytes[p+16*i+4:p+16*i+8], nodeData.IngressEgressIds[:]) + copy(bytes[p+16*i+8:p+16*i+12], nodeData.Second[:]) + copy(bytes[p+16*i+12:p+16*i+16], nodeData.Subsecond[:]) + } + + return nil +} + +func (l *HBHLayer) NextLayerType() gopacket.LayerType { + return gopacket.LayerTypePayload +} + +func decodeHBHLayer(data []byte, p gopacket.PacketBuilder) error { + l := &HBHLayer{} + err := l.DecodeFromBytes(data, p) + if err != nil { + return nil + } + p.AddLayer(l) + next := l.NextLayerType() + if next == gopacket.LayerTypeZero { + return nil + } + + return p.NextDecoder(next) +} diff --git a/internal/pkg/meter/parse.go b/internal/pkg/meter/parse.go new file mode 100644 index 0000000..1e9bcd3 --- /dev/null +++ b/internal/pkg/meter/parse.go @@ -0,0 +1,86 @@ +package meter + +import ( + "fmt" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +const MAX_SEGMENTLIST_ENTRIES = 10 + +type ProbeData struct { + H_source string + H_dest string + V6Srcaddr string + V6Dstaddr string + NextHdr uint8 + HdrExtLen uint8 + RoutingType uint8 + SegmentsLeft uint8 + LastEntry uint8 + Flags uint8 + Tag uint16 + Segments [MAX_SEGMENTLIST_ENTRIES]string +} + +func Parse(data []byte) (*ProbeData, error) { + var pd ProbeData + packet := gopacket.NewPacket(data, layers.LayerTypeEthernet, gopacket.Default) + + ethLayer := packet.Layer(layers.LayerTypeEthernet) + eth, ok := ethLayer.(*layers.Ethernet) + if !ok { + return nil, fmt.Errorf("Could not parse a packet with Ethernet") + } + + pd.H_dest = eth.DstMAC.String() + pd.H_source = eth.SrcMAC.String() + + ipv6Layer := packet.Layer(layers.LayerTypeIPv6) + ipv6, ok := ipv6Layer.(*layers.IPv6) + if !ok { + return nil, fmt.Errorf("Could not parse a packet with IPv6") + } + + pd.V6Srcaddr = ipv6.SrcIP.String() + pd.V6Dstaddr = ipv6.DstIP.String() + + if ipv6.NextHeader != layers.IPProtocolIPv6HopByHop { + return nil, fmt.Errorf("Next header is not IPv6 hop-by-hop(0): %d", ipv6.NextHeader) + } + + ipv6HBHLayer := packet.Layer(layers.LayerTypeIPv6HopByHop) + hbh, ok := ipv6HBHLayer.(*layers.IPv6HopByHop) + if !ok { + return nil, fmt.Errorf("Could not parse a packet with ipv6 hop-by-hop option") + } + + if hbh.NextHeader != layers.IPProtocolIPv6Routing { + return nil, fmt.Errorf("Next header is not SRv6: %d", hbh.NextHeader) + } + + packet = gopacket.NewPacket(ipv6HBHLayer.LayerPayload(), Srv6LayerType, gopacket.Lazy) + srv6Layer := packet.Layer(Srv6LayerType) + srv6, ok := srv6Layer.(*Srv6Layer) + if !ok { + return nil, fmt.Errorf("Could not parse a packet with SRv6") + } + + pd.NextHdr = srv6.NextHeader + pd.HdrExtLen = srv6.HdrExtLen + pd.RoutingType = srv6.RoutingType + pd.SegmentsLeft = srv6.SegmentsLeft + pd.LastEntry = srv6.LastEntry + pd.Flags = srv6.Flags + pd.Tag = srv6.Tag + + for idx := 0; idx < MAX_SEGMENTLIST_ENTRIES; idx++ { + if idx >= len(srv6.Segments) { + break + } + pd.Segments[idx] = srv6.Segments[idx].String() + } + + return &pd, nil +} diff --git a/pkg/bpf/srv6.go b/internal/pkg/meter/srv6.go similarity index 86% rename from pkg/bpf/srv6.go rename to internal/pkg/meter/srv6.go index 2936829..78472c0 100644 --- a/pkg/bpf/srv6.go +++ b/internal/pkg/meter/srv6.go @@ -1,9 +1,9 @@ -package bpf +package meter import ( "encoding/binary" - "errors" - "net" + "fmt" + "net/netip" "github.com/google/gopacket" "github.com/google/gopacket/layers" @@ -18,7 +18,7 @@ type Srv6Layer struct { LastEntry uint8 Flags uint8 Tag uint16 - Segments []net.IP + Segments []netip.Addr } var Srv6LayerType = gopacket.RegisterLayerType( @@ -36,7 +36,7 @@ func (l *Srv6Layer) LayerType() gopacket.LayerType { func (i *Srv6Layer) DecodeFromBytes(data []byte, df gopacket.DecodeFeedback) error { if len(data) < 8 { df.SetTruncated() - return errors.New("SRV6 layer less then 8 bytes for SRV6 packet") + return fmt.Errorf("SRV6 layer less then 8 bytes for SRV6 packet") } i.NextHeader = data[0] i.HdrExtLen = data[1] @@ -46,14 +46,15 @@ func (i *Srv6Layer) DecodeFromBytes(data []byte, df gopacket.DecodeFeedback) err i.Flags = data[5] i.Tag = binary.BigEndian.Uint16(data[6:8]) - for j := 0; j < int(i.HdrExtLen/2); j++ { + for j := 0; j < int(i.LastEntry+1); j++ { startBit := 8 + 16*j endBit := 24 + 16*j var addr []byte - for k := endBit; k >= startBit; k-- { + for k := startBit; k < endBit; k++ { addr = append(addr, data[k]) } - i.Segments = append(i.Segments, addr) + seg, _ := netip.AddrFromSlice(addr[:16]) + i.Segments = append(i.Segments, seg) } i.BaseLayer = layers.BaseLayer{ Contents: data[:8], @@ -76,7 +77,8 @@ func (i *Srv6Layer) SerializeTo(b gopacket.SerializeBuffer, opts gopacket.Serial bytes[5] = i.Flags binary.BigEndian.PutUint16(bytes[6:], i.Tag) - for i2, address := range i.Segments { + for i2, seg := range i.Segments { + address := seg.AsSlice() lsb := binary.BigEndian.Uint64(address[:8]) msb := binary.BigEndian.Uint64(address[8:]) binary.BigEndian.PutUint64(bytes[8+16*i2:], lsb) diff --git a/pkg/bpf/bpf.go b/pkg/bpf/bpf.go index 4936e2a..fe5a2db 100644 --- a/pkg/bpf/bpf.go +++ b/pkg/bpf/bpf.go @@ -7,45 +7,78 @@ package bpf import ( - "fmt" + "errors" "net" "github.com/cilium/ebpf" - "github.com/pkg/errors" + "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf/perf" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -no-global-types -cc $BPF_CLANG -cflags $BPF_CFLAGS xdp ../../src/main.c -- -I../../src -type XdpProbeData struct { - H_dest [6]uint8 - H_source [6]uint8 - H_proto uint16 - _ [2]byte - V6Srcaddr struct{ In6U struct{ U6Addr8 [16]uint8 } } - V6Dstaddr struct{ In6U struct{ U6Addr8 [16]uint8 } } - NextHdr uint8 - HdrExtLen uint8 - RoutingType uint8 - SegmentsLeft uint8 - LastEntry uint8 - Flags uint8 - Tag uint16 - Segments [10]struct{ In6U struct{ U6Addr8 [16]uint8 } } +type XdpMetaData struct { + ReceivedNano uint64 + SentSec uint32 + SentSubsec uint32 } -func ReadXdpObjects(ops *ebpf.CollectionOptions) (*xdpObjects, error) { +type Xdp struct { + objs *xdpObjects + link link.Link +} + +func ReadXdpObjects(ops *ebpf.CollectionOptions) (*Xdp, error) { obj := &xdpObjects{} err := loadXdpObjects(obj, ops) if err != nil { - return nil, errors.WithStack(err) + return nil, err } // TODO: BPF log level remove hardcoding. yaml in config if err != nil { - return nil, errors.WithStack(err) + return nil, err + } + + return &Xdp{ + objs: obj, + }, nil +} + +func (x *Xdp) Attach(iface *net.Interface) error { + l, err := link.AttachXDP(link.XDPOptions{ + Program: x.objs.XdpProg, + Interface: iface.Index, + Flags: link.XDPGenericMode, + }) + if err != nil { + return err } - return obj, nil + x.link = l + + return nil +} + +func (x *Xdp) NewPerfReader() (*perf.Reader, error) { + return perf.NewReader(x.objs.PacketProbePerf, 4096) +} + +func (x *Xdp) Close() error { + errs := []error{} + if err := x.objs.Close(); err != nil { + errs = append(errs, err) + } + + if err := x.link.Close(); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + return nil } const ( @@ -55,16 +88,3 @@ const ( XDP_TX XDP_REDIRECT ) - -func PrintEntrys(entry XdpProbeData, count uint64) { - mac := func(mac [6]uint8) string { - return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]) - } - saddr := net.IP(entry.V6Srcaddr.In6U.U6Addr8[:]).String() - daddr := net.IP(entry.V6Dstaddr.In6U.U6Addr8[:]).String() - - fmt.Printf( - "H_dest: %s, H_source: %v, H_proto: %v, V6Dstaddr: %v, V6Srcaddr: %v -> count: %v\n", - mac(entry.H_dest), mac(entry.H_source), entry.H_proto, daddr, saddr, count) - -} diff --git a/pkg/bpf/xdp_bpfeb.go b/pkg/bpf/xdp_bpfeb.go index 993d162..b6c41db 100644 --- a/pkg/bpf/xdp_bpfeb.go +++ b/pkg/bpf/xdp_bpfeb.go @@ -60,7 +60,7 @@ type xdpProgramSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type xdpMapSpecs struct { - IpfixProbeMap *ebpf.MapSpec `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.MapSpec `ebpf:"packet_probe_perf"` } // xdpObjects contains all objects after they have been loaded into the kernel. @@ -82,12 +82,12 @@ func (o *xdpObjects) Close() error { // // It can be passed to loadXdpObjects or ebpf.CollectionSpec.LoadAndAssign. type xdpMaps struct { - IpfixProbeMap *ebpf.Map `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.Map `ebpf:"packet_probe_perf"` } func (m *xdpMaps) Close() error { return _XdpClose( - m.IpfixProbeMap, + m.PacketProbePerf, ) } diff --git a/pkg/bpf/xdp_bpfeb.o b/pkg/bpf/xdp_bpfeb.o index 0275301..93101db 100644 Binary files a/pkg/bpf/xdp_bpfeb.o and b/pkg/bpf/xdp_bpfeb.o differ diff --git a/pkg/bpf/xdp_bpfel.go b/pkg/bpf/xdp_bpfel.go index 12b957d..ceb20b7 100644 --- a/pkg/bpf/xdp_bpfel.go +++ b/pkg/bpf/xdp_bpfel.go @@ -60,7 +60,7 @@ type xdpProgramSpecs struct { // // It can be passed ebpf.CollectionSpec.Assign. type xdpMapSpecs struct { - IpfixProbeMap *ebpf.MapSpec `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.MapSpec `ebpf:"packet_probe_perf"` } // xdpObjects contains all objects after they have been loaded into the kernel. @@ -82,12 +82,12 @@ func (o *xdpObjects) Close() error { // // It can be passed to loadXdpObjects or ebpf.CollectionSpec.LoadAndAssign. type xdpMaps struct { - IpfixProbeMap *ebpf.Map `ebpf:"ipfix_probe_map"` + PacketProbePerf *ebpf.Map `ebpf:"packet_probe_perf"` } func (m *xdpMaps) Close() error { return _XdpClose( - m.IpfixProbeMap, + m.PacketProbePerf, ) } diff --git a/pkg/bpf/xdp_bpfel.o b/pkg/bpf/xdp_bpfel.o index f936356..872c945 100644 Binary files a/pkg/bpf/xdp_bpfel.o and b/pkg/bpf/xdp_bpfel.o differ diff --git a/pkg/bpf/xdp_test.go b/pkg/bpf/xdp_test.go index eca31c6..e986c43 100644 --- a/pkg/bpf/xdp_test.go +++ b/pkg/bpf/xdp_test.go @@ -1,15 +1,27 @@ package bpf import ( + "bytes" + "encoding/binary" "fmt" "net" + "net/netip" "testing" + "unsafe" + "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/rlimit" "github.com/google/gopacket" "github.com/google/gopacket/layers" + "github.com/nttcom/fluvia/internal/pkg/meter" ) +type testData struct { + sentSec uint32 + sentSubsec uint32 + probeData meter.ProbeData +} + func generateInput(t *testing.T) []byte { t.Helper() opts := gopacket.SerializeOptions{FixLengths: true, ComputeChecksums: true} @@ -23,10 +35,13 @@ func generateInput(t *testing.T) []byte { dstPort := layers.UDPPort(54321) // Define the SRv6 segment list - segmentList := []net.IP{ - net.ParseIP("2001:db8:dead:beef::1"), - net.ParseIP("2001:db8:dead:beef::2"), - } + segmentList := []netip.Addr{} + + addr, _ := netip.ParseAddr("2001:db8:dead:beef::1") + segmentList = append(segmentList, addr) + + addr, _ = netip.ParseAddr("2001:db8:dead:beef::2") + segmentList = append(segmentList, addr) // Create the Ethernet layer ethernetLayer := &layers.Ethernet{ @@ -38,14 +53,56 @@ func generateInput(t *testing.T) []byte { // Create the IPv6 layer ipv6Layer := &layers.IPv6{ Version: 6, - NextHeader: layers.IPProtocolIPv6Routing, + NextHeader: layers.IPProtocolIPv6HopByHop, HopLimit: 64, SrcIP: srcIP, DstIP: dstIP, } + // Create the IPv6 Hop-By-Hop option layer + hbhLayer := &meter.HBHLayer{ + NextHeader: uint8(layers.IPProtocolIPv6Routing), + Length: 5, + Options: []meter.IoamOption{ + { + Type: meter.IPV6_TLV_PAD1, + }, + { + Type: meter.IPV6_TLV_PAD1, + }, + { + Type: 0x31, + Length: 0x2a, + Reserved: 0x00, + OptionType: 0x00, // Pre-allocated Trace + TraceHeader: meter.IoamTrace{ + NameSpaceId: 1, + NodeLen: 4, + Flags: 0b0000, + RemainingLen: 0b0000001, + Type: [3]byte{0xf0, 0x00, 0x00}, + Reserved: 0x00, + NodeDataList: []meter.NodeData{ + { + HopLimitNodeId: [4]byte{0x00, 0x00, 0x00, 0x00}, + IngressEgressIds: [4]byte{0x00, 0x00, 0x00, 0x00}, + Second: [4]byte{0x00, 0x00, 0x00, 0x00}, + Subsecond: [4]byte{0x00, 0x00, 0x00, 0x00}, + }, + { + HopLimitNodeId: [4]byte{0x40, 0x00, 0x00, 0x01}, + IngressEgressIds: [4]byte{0x00, 0x05, 0x00, 0x04}, + Second: [4]byte{0x65, 0x38, 0xd5, 0xf6}, + Subsecond: [4]byte{0x3b, 0x53, 0x3d, 0x00}, + }, + }, + }, + }, + }, + } + // Create the SRv6 extension header layer - seg6layer := &Srv6Layer{ + seg6layer := &meter.Srv6Layer{ NextHeader: uint8(layers.IPProtocolUDP), HdrExtLen: uint8((8+16*len(segmentList))/8 - 1), RoutingType: 4, // SRH @@ -65,7 +122,7 @@ func generateInput(t *testing.T) []byte { } err := gopacket.SerializeLayers(buf, opts, - ethernetLayer, ipv6Layer, seg6layer, udpLayer, + ethernetLayer, ipv6Layer, hbhLayer, seg6layer, udpLayer, gopacket.Payload([]byte("Hello, SRv6!")), ) if err != nil { @@ -85,6 +142,36 @@ func TestXDPProg(t *testing.T) { } defer objs.Close() + fmt.Println("debug log") + perfEvent, err := perf.NewReader(objs.PacketProbePerf, 4096) + if err != nil { + t.Fatal(err) + } + + var metadata XdpMetaData + + expected := testData{ + sentSec: 0x6538d5f6, + sentSubsec: 0x3b533d00, + probeData: meter.ProbeData{ + H_source: "02:42:ac:11:00:02", + H_dest: "02:42:ac:11:00:03", + V6Srcaddr: "2001:db8::1", + V6Dstaddr: "2001:db8::2", + NextHdr: uint8(layers.IPProtocolUDP), + HdrExtLen: uint8((8+16*2)/8 - 1), + RoutingType: 4, + SegmentsLeft: 2, + LastEntry: 1, + Flags: 0, + Tag: 0, + Segments: [10]string{ + "2001:db8:dead:beef::1", + "2001:db8:dead:beef::2", + }, + }, + } + ret, _, err := objs.XdpProg.Test(generateInput(t)) if err != nil { t.Error(err) @@ -95,14 +182,40 @@ func TestXDPProg(t *testing.T) { t.Errorf("got %d want %d", ret, 2) } - fmt.Println("debug log") - var entry XdpProbeData - var count uint64 - iter := objs.IpfixProbeMap.Iterate() - for iter.Next(&entry, &count) { - PrintEntrys(entry, count) - } - if err := iter.Err(); err != nil { - fmt.Printf("Failed to iterate map: %v\n", err) + fmt.Println("before read") + + eventData, err := perfEvent.Read() + if err != nil { + t.Fatal(err) + } + + fmt.Println("Done read") + + reader := bytes.NewReader(eventData.RawSample) + + if err := binary.Read(reader, binary.LittleEndian, &metadata); err != nil { + t.Fatal(err) + } + + metadataSize := unsafe.Sizeof(metadata) + if len(eventData.RawSample) <= int(metadataSize) { + t.Fatalf("XDP did not send raw packet") + } + + probeData, err := meter.Parse(eventData.RawSample[metadataSize:]) + if err != nil { + t.Fatal(err) + } + + actual := testData{ + sentSec: metadata.SentSec, + sentSubsec: metadata.SentSubsec, + probeData: *probeData, + } + + if actual != expected { + t.Errorf("TEST FAILED\n") + t.Errorf("expected value: %+v\n", expected) + t.Errorf("actual value: %+v\n", actual) } } diff --git a/pkg/client/client.go b/pkg/client/client.go index ddde3f4..0175ad0 100755 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -7,11 +7,12 @@ package client import ( "net" + "time" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" ) -func New(ingressIfName string, raddr *net.UDPAddr) ClientError { +func New(ingressIfName string, raddr *net.UDPAddr, interval int) ClientError { ch := make(chan []ipfix.FieldValue) errChan := make(chan ClientError) @@ -25,7 +26,18 @@ func New(ingressIfName string, raddr *net.UDPAddr) ClientError { } } }() - go NewMeter(ingressIfName, ch) + + m := NewMeter(ingressIfName) + go func() { + err := m.Run(ch, time.Duration(interval)) + if err != nil { + errChan <- ClientError{ + Component: "meter", + Error: err, + } + } + m.Close() + }() for { clientError := <-errChan diff --git a/pkg/client/exporter.go b/pkg/client/exporter.go index 69a41e4..fa2fdfb 100644 --- a/pkg/client/exporter.go +++ b/pkg/client/exporter.go @@ -10,7 +10,7 @@ import ( "net" "os" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" ) const OBSERVATION_ID uint32 = 61166 diff --git a/pkg/client/meter.go b/pkg/client/meter.go index fe79628..6103b58 100644 --- a/pkg/client/meter.go +++ b/pkg/client/meter.go @@ -7,91 +7,240 @@ package client import ( - "fmt" + "bytes" + "context" + "encoding/binary" + "errors" "log" "net" "net/netip" + "sync" "time" + "unsafe" - "github.com/cilium/ebpf/link" + "github.com/cilium/ebpf" + "github.com/nttcom/fluvia/internal/pkg/meter" "github.com/nttcom/fluvia/pkg/bpf" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" + "golang.org/x/sync/errgroup" + "golang.org/x/sys/unix" ) -func NewMeter(ingressIfName string, ch chan []ipfix.FieldValue) { +type Stats struct { + Count int64 + DelayMean int64 + DelayMin int64 + DelayMax int64 + DelaySum int64 +} + +type StatsMap struct { + Mu sync.RWMutex + Db map[meter.ProbeData]*Stats +} + +type Meter struct { + statsMap *StatsMap + bootTime time.Time + xdp *bpf.Xdp +} + +func NewMeter(ingressIfName string) *Meter { + bootTime, err := getSystemBootTime() + if err != nil { + log.Fatalf("Could not get boot time: %s", err) + } + + statsMap := StatsMap{Db: make(map[meter.ProbeData]*Stats)} + iface, err := net.InterfaceByName(ingressIfName) if err != nil { log.Fatalf("lookup network iface %q: %s", ingressIfName, err) } // Load the XDP program - objs, err := bpf.ReadXdpObjects(nil) + xdp, err := bpf.ReadXdpObjects(&ebpf.CollectionOptions{ + Programs: ebpf.ProgramOptions{ + LogLevel: ebpf.LogLevelInstruction, + LogSize: ebpf.DefaultVerifierLogSize * 256, + }, + }) if err != nil { - log.Fatalf("Could not load XDP program: %s", err) + var ve *ebpf.VerifierError + if errors.As(err, &ve) { + log.Fatalf("Could not load XDP program: %+v\n", ve) + } } - defer objs.Close() // Attach the XDP program. - l, err := link.AttachXDP(link.XDPOptions{ - Program: objs.XdpProg, - Interface: iface.Index, - Flags: link.XDPGenericMode, - }) - if err != nil { + if err = xdp.Attach(iface); err != nil { log.Fatalf("Could not attach XDP program: %s", err) } - defer l.Close() log.Printf("Attached XDP program to iface %q (index %d)", iface.Name, iface.Index) log.Printf("Press Ctrl-C to exit and remove the program") - ticker := time.NewTicker(1 * time.Second) - defer ticker.Stop() - mapLogs := map[bpf.XdpProbeData]uint64{} - for range ticker.C { - var entry bpf.XdpProbeData - var count uint64 + return &Meter{ + statsMap: &statsMap, + bootTime: bootTime, + xdp: xdp, + } +} + +func (m *Meter) Run(flowChan chan []ipfix.FieldValue, interval time.Duration) error { + eg, ctx := errgroup.WithContext(context.Background()) + eg.Go(func() error { + return m.Read(ctx) + }) + eg.Go(func() error { + return m.Send(ctx, flowChan, interval) + }) + + if err := eg.Wait(); err != nil { + return err + } + + return nil +} + +func (m *Meter) Read(ctx context.Context) error { + perfEvent, err := m.xdp.NewPerfReader() + if err != nil { + log.Fatalf("Could not obtain perf reader: %s", err) + } + + var metadata bpf.XdpMetaData + for { + select { + case <-ctx.Done(): + return nil + default: + eventData, err := perfEvent.Read() + if err != nil { + log.Fatalf("Could not read from bpf perf map:") + } - iter := objs.IpfixProbeMap.Iterate() + reader := bytes.NewReader(eventData.RawSample) - for iter.Next(&entry, &count) { - if _, ok := mapLogs[entry]; !ok { - mapLogs[entry] = 0 + if err := binary.Read(reader, binary.LittleEndian, &metadata); err != nil { + log.Fatalf("Could not read from reader: %s", err) } - dCnt := uint64(count - mapLogs[entry]) + metadata_size := unsafe.Sizeof(metadata) + if len(eventData.RawSample)-int(metadata_size) <= 0 { + continue + } - mapLogs[entry] = count + receivedNano := m.bootTime.Add(time.Duration(metadata.ReceivedNano) * time.Nanosecond) + SentNano := time.Unix(int64(metadata.SentSec), int64(metadata.SentSubsec)) - sl := []ipfix.SRHSegmentIPv6{} - for _, binSeg := range entry.Segments { - ipSeg, _ := netip.AddrFromSlice(binSeg.In6U.U6Addr8[:]) + delay := receivedNano.Sub(SentNano) - // Ignore zero values received from bpf map - if ipSeg == netip.IPv6Unspecified() { - break - } - seg := ipfix.SRHSegmentIPv6{Val: ipSeg} - sl = append(sl, seg) + probeData, err := meter.Parse(eventData.RawSample[metadata_size:]) + if err != nil { + log.Fatalf("Could not parse the packet: %s", err) } - actSeg, _ := netip.AddrFromSlice(entry.Segments[entry.SegmentsLeft].In6U.U6Addr8[:]) - - f := []ipfix.FieldValue{ - &ipfix.PacketDeltaCount{Val: dCnt}, - &ipfix.SRHActiveSegmentIPv6{Val: actSeg}, - &ipfix.SRHSegmentsIPv6Left{Val: entry.SegmentsLeft}, - &ipfix.SRHFlagsIPv6{Val: entry.Flags}, - &ipfix.SRHTagIPv6{Val: entry.Tag}, - &ipfix.SRHSegmentIPv6BasicList{ - SegmentList: sl, - }, + delayMicro := delay.Microseconds() + + m.statsMap.Mu.Lock() + if value, ok := m.statsMap.Db[*probeData]; !ok { + m.statsMap.Db[*probeData] = &Stats{ + Count: 1, + DelayMean: delayMicro, + DelayMin: delayMicro, + DelayMax: delayMicro, + DelaySum: delayMicro, + } + } else { + value.Count = value.Count + 1 + + if delayMicro < value.DelayMin { + value.DelayMin = delayMicro + } + + if delayMicro > value.DelayMax { + value.DelayMax = delayMicro + } + + value.DelaySum = value.DelaySum + delayMicro + value.DelayMean = value.DelaySum / value.Count } - // Throw to channel - ch <- f + m.statsMap.Mu.Unlock() } - if err := iter.Err(); err != nil { - fmt.Printf("Failed to iterate map: %v\n", err) + } +} + +func (m *Meter) Send(ctx context.Context, flowChan chan []ipfix.FieldValue, intervalSec time.Duration) error { + ticker := time.NewTicker(intervalSec * time.Second) + defer ticker.Stop() + + for range ticker.C { + select { + case <-ctx.Done(): + return nil + default: + m.statsMap.Mu.Lock() + for probeData, stat := range m.statsMap.Db { + dCnt := uint64(stat.Count) + + sl := []ipfix.SRHSegmentIPv6{} + for _, seg := range probeData.Segments { + if seg == "" { + break + } + ipSeg, _ := netip.ParseAddr(seg) + + // Ignore zero values received from bpf map + if ipSeg == netip.IPv6Unspecified() { + break + } + seg := ipfix.SRHSegmentIPv6{Val: ipSeg} + sl = append(sl, seg) + } + + actSeg, _ := netip.ParseAddr(probeData.Segments[probeData.SegmentsLeft]) + + f := []ipfix.FieldValue{ + &ipfix.PacketDeltaCount{Val: dCnt}, + &ipfix.SRHActiveSegmentIPv6{Val: actSeg}, + &ipfix.SRHSegmentsIPv6Left{Val: probeData.SegmentsLeft}, + &ipfix.SRHFlagsIPv6{Val: probeData.Flags}, + &ipfix.SRHTagIPv6{Val: probeData.Tag}, + &ipfix.SRHSegmentIPv6BasicList{ + SegmentList: sl, + }, + &ipfix.PathDelayMeanDeltaMicroseconds{Val: uint32(stat.DelayMean)}, + &ipfix.PathDelayMinDeltaMicroseconds{Val: uint32(stat.DelayMin)}, + &ipfix.PathDelayMaxDeltaMicroseconds{Val: uint32(stat.DelayMax)}, + &ipfix.PathDelaySumDeltaMicroseconds{Val: uint32(stat.DelaySum)}, + } + // Throw to channel + flowChan <- f + + // Stats (e.g., DelayMean) are based on packets received over a fixed duration + // These need to be cleared out for the next calculation of statistics + delete(m.statsMap.Db, probeData) + } + m.statsMap.Mu.Unlock() } } + + return nil +} + +func (m *Meter) Close() error { + if err := m.xdp.Close(); err != nil { + return err + } + + return nil +} + +func getSystemBootTime() (time.Time, error) { + var ts unix.Timespec + if err := unix.ClockGettime(unix.CLOCK_MONOTONIC, &ts); err != nil { + return time.Time{}, err + } + return time.Now().Add(-time.Duration(ts.Nano())), nil } diff --git a/pkg/packet/ipfix/field_value.go b/pkg/ipfix/field_value.go similarity index 94% rename from pkg/packet/ipfix/field_value.go rename to pkg/ipfix/field_value.go index 9fa0b14..b7c6b92 100644 --- a/pkg/packet/ipfix/field_value.go +++ b/pkg/ipfix/field_value.go @@ -294,6 +294,102 @@ func (fv *SRHSegmentIPv6EndpointBehavior) FieldSpecifier() *FieldSpecifier { return fs } +type PathDelayMeanDeltaMicroseconds struct { + Val uint32 +} + +func (fv *PathDelayMeanDeltaMicroseconds) ElementID() uint16 { + return IEID_PATH_DELAY_MEAN_DALTA_MICROSECONDS +} + +func (fv *PathDelayMeanDeltaMicroseconds) Serialize() []uint8 { + ret := make([]uint8, 4) + binary.BigEndian.PutUint32(ret, fv.Val) + return ret +} + +func (fv *PathDelayMeanDeltaMicroseconds) Len() uint16 { + return 4 +} + +func (fv *PathDelayMeanDeltaMicroseconds) FieldSpecifier() *FieldSpecifier { + templateLen := fv.Len() + fs := NewFieldSpecifier(false, fv.ElementID(), templateLen, ENTERPRISE_NUMBER_NTTCOM) + return fs +} + +type PathDelayMinDeltaMicroseconds struct { + Val uint32 +} + +func (fv *PathDelayMinDeltaMicroseconds) ElementID() uint16 { + return IEID_PATH_DELAY_MIN_DALTA_MICROSECONDS +} + +func (fv *PathDelayMinDeltaMicroseconds) Serialize() []uint8 { + ret := make([]uint8, 4) + binary.BigEndian.PutUint32(ret, fv.Val) + return ret +} + +func (fv *PathDelayMinDeltaMicroseconds) Len() uint16 { + return 4 +} + +func (fv *PathDelayMinDeltaMicroseconds) FieldSpecifier() *FieldSpecifier { + templateLen := fv.Len() + fs := NewFieldSpecifier(false, fv.ElementID(), templateLen, ENTERPRISE_NUMBER_NTTCOM) + return fs +} + +type PathDelayMaxDeltaMicroseconds struct { + Val uint32 +} + +func (fv *PathDelayMaxDeltaMicroseconds) ElementID() uint16 { + return IEID_PATH_DELAY_MAX_DALTA_MICROSECONDS +} + +func (fv *PathDelayMaxDeltaMicroseconds) Serialize() []uint8 { + ret := make([]uint8, 4) + binary.BigEndian.PutUint32(ret, fv.Val) + return ret +} + +func (fv *PathDelayMaxDeltaMicroseconds) Len() uint16 { + return 4 +} + +func (fv *PathDelayMaxDeltaMicroseconds) FieldSpecifier() *FieldSpecifier { + templateLen := fv.Len() + fs := NewFieldSpecifier(false, fv.ElementID(), templateLen, ENTERPRISE_NUMBER_NTTCOM) + return fs +} + +type PathDelaySumDeltaMicroseconds struct { + Val uint32 +} + +func (fv *PathDelaySumDeltaMicroseconds) ElementID() uint16 { + return IEID_PATH_DELAY_SUM_DALTA_MICROSECONDS +} + +func (fv *PathDelaySumDeltaMicroseconds) Serialize() []uint8 { + ret := make([]uint8, 4) + binary.BigEndian.PutUint32(ret, fv.Val) + return ret +} + +func (fv *PathDelaySumDeltaMicroseconds) Len() uint16 { + return 4 +} + +func (fv *PathDelaySumDeltaMicroseconds) FieldSpecifier() *FieldSpecifier { + templateLen := fv.Len() + fs := NewFieldSpecifier(false, fv.ElementID(), templateLen, ENTERPRISE_NUMBER_NTTCOM) + return fs +} + type UndefinedFieldValue struct { ElemID uint16 Value []uint8 diff --git a/pkg/packet/ipfix/ipfix.go b/pkg/ipfix/ipfix.go similarity index 100% rename from pkg/packet/ipfix/ipfix.go rename to pkg/ipfix/ipfix.go diff --git a/src/main.c b/src/main.c index f32fd4a..80241c1 100644 --- a/src/main.c +++ b/src/main.c @@ -1,8 +1,11 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR MIT) */ /* * Copyright (c) 2023 NTT Communications Corporation * Copyright (c) 2023 Takeru Hayasaka */ +#include "xdp_consts.h" +#include "xdp_struct.h" #define KBUILD_MODNAME "xdp_probe" #include #include @@ -12,19 +15,54 @@ #include #include #include +#include #include #include #include "xdp_map.h" +static inline int parse_ioam6_trace_header(struct ioam6_trace_hdr *ith, int hdr_len, struct metadata *key, void *data_end) +{ + __u8 second_index, subsecond_index; + __u32 second, subsecond; + + if ((void *)(ith + 1) > data_end) + return -1; + + second_index = hdr_len - 8; + subsecond_index = hdr_len - 4; + + if ((void *)ith + second_index + 4 > data_end) return -1; + second = bpf_ntohl(*(__u32 *)((void *)ith + second_index)); + + if ((void *)ith + subsecond_index + 4 > data_end) return -1; + subsecond = bpf_ntohl(*(__u32 *)((void *)ith + subsecond_index)); + + key->sent_second = second; + key->sent_subsecond = subsecond; + + return 0; +} + SEC("xdp") int xdp_prog(struct xdp_md *ctx) { void *data_end = (void *)(long)ctx->data_end; void *data = (void *)(long)ctx->data; - __u32 probe_key = XDP_PASS; + __u64 packet_size = data_end - data; + __u8 *p; + struct metadata md = {}; + int ret, hoplen; + struct ethhdr *eth = data; + struct ipv6hdr *ipv6; + struct srhhdr *srh; + struct ipv6_hopopt_hdr *hopopth; + struct ioam6_hdr *ioam6h; + struct ioam6_trace_hdr *ioam6_trace_h; + + md.received_nanosecond = bpf_ktime_get_ns(); if ((void *)(eth + 1) > data_end) return XDP_PASS; @@ -32,59 +70,72 @@ int xdp_prog(struct xdp_md *ctx) if (eth->h_proto != bpf_htons(ETH_P_IPV6)) return XDP_PASS; - struct ipv6hdr *ipv6 = (void *)(eth + 1); + ipv6 = (void *)(eth + 1); if ((void *)(ipv6 + 1) > data_end) return XDP_PASS; - // is srv6 - if (ipv6->nexthdr != IPPROTO_IPV6ROUTE) + if (ipv6->nexthdr != IPPROTO_HOPOPTS) return XDP_PASS; - struct srhhdr *srh = (void *)(ipv6 + 1); - if ((void *)(srh + 1) > data_end) + hopopth = (struct ipv6_hopopt_hdr *)(ipv6 + 1); + if ((void *)(hopopth + 1) > data_end) return XDP_PASS; - if (srh->routingType != IPV6_SRCRT_TYPE_4) // IPV6_SRCRT_TYPE_4 = SRH + hoplen = (hopopth->hdrlen + 1) << 3; + + p = (__u8 *)(hopopth + 1); + + if ((void *)(p + 1) > data_end) + return XDP_PASS; + + if (*p == IPV6_TLV_PAD1) { + p += 1; + } + + if ((void *)(p + 1) > data_end) + return XDP_PASS; + + if (*p == IPV6_TLV_PAD1) { + p += 1; + } + + ioam6h = (struct ioam6_hdr *)p; + if ((void *)(ioam6h + 1) > data_end) { + return XDP_PASS; + } + + if (ioam6h->opt_type != IPV6_TLV_IOAM) { return XDP_PASS; + } + + if (ioam6h->type != IOAM6_TYPE_PREALLOC) { + return XDP_PASS; + } - struct probe_data key = {}; - __u64 zero = 0, *value; - __builtin_memcpy(&key.h_source, ð->h_source, ETH_ALEN); - __builtin_memcpy(&key.h_dest, ð->h_dest, ETH_ALEN); - key.h_proto = eth->h_proto; - key.v6_srcaddr = ipv6->saddr; - key.v6_dstaddr = ipv6->daddr; - - key.nextHdr = srh->nextHdr; - key.hdrExtLen = srh->hdrExtLen; - key.routingType = srh->routingType; - key.segmentsLeft = srh->segmentsLeft; - key.lastEntry = srh->lastEntry; - key.flags = srh->flags; - key.tag = srh->tag; - - for (int i = 0; i < MAX_SEGMENTLIST_ENTRIES; i++) - { - if (!(i < key.lastEntry + 1)) - break; - - if ((void *)(data + sizeof(struct ethhdr) + sizeof(struct ipv6hdr) + sizeof(struct srhhdr) + sizeof(struct in6_addr) * (i + 1) + 1) > data_end) - break; - - __builtin_memcpy(&key.segments[i], &srh->segments[i], sizeof(struct in6_addr)); + ioam6_trace_h = (struct ioam6_trace_hdr *)(ioam6h + 1); + if ((void *)(ioam6_trace_h + 1) > data_end) { + return XDP_PASS; } - value = bpf_map_lookup_elem(&ipfix_probe_map, &key); - if (!value) - { - bpf_map_update_elem(&ipfix_probe_map, &key, &zero, BPF_NOEXIST); - value = bpf_map_lookup_elem(&ipfix_probe_map, &key); - if (!value) - return XDP_PASS; + ret = parse_ioam6_trace_header(ioam6_trace_h, ioam6h->opt_len - 2, &md, data_end); + if (ret != 0) { + return XDP_PASS; } - (*value)++; + + if (hopopth->nexthdr != IPPROTO_IPV6ROUTE) + return XDP_PASS; + + srh = (struct srhhdr *)((void *)hopopth + hoplen); + if ((void *)(srh + 1) > data_end) + return XDP_PASS; + + if (srh->routingType != IPV6_SRCRT_TYPE_4) // IPV6_SRCRT_TYPE_4 = SRH + return -1; + + __u64 flags = BPF_F_CURRENT_CPU | (packet_size << 32); + bpf_perf_event_output(ctx, &packet_probe_perf, flags, &md, sizeof(md)); return XDP_PASS; } -char _license[] SEC("license") = "MIT"; +char _license[] SEC("license") = "Dual MIT/GPL"; diff --git a/src/xdp_consts.h b/src/xdp_consts.h index d7b8e73..c9eb237 100644 --- a/src/xdp_consts.h +++ b/src/xdp_consts.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR MIT) */ /* * Copyright (c) 2023 NTT Communications Corporation * Copyright (c) 2023 Takeru Hayasaka @@ -7,7 +8,6 @@ #define __XDP_CONSTS_H #define MAX_MAP_ENTRIES 1024 -#define MAX_SEGMENTLIST_ENTRIES 10 #define IPPROTO_IPV6ROUTE 43 #endif diff --git a/src/xdp_map.h b/src/xdp_map.h index 0e6736a..b5bab3f 100644 --- a/src/xdp_map.h +++ b/src/xdp_map.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR MIT) */ /* * Copyright (c) 2023 NTT Communications Corporation * Copyright (c) 2023 Takeru Hayasaka @@ -14,10 +15,8 @@ #include "xdp_struct.h" struct { - __uint(type, BPF_MAP_TYPE_LRU_HASH); + __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); __uint(max_entries, MAX_MAP_ENTRIES); - __type(key, struct probe_data); - __type(value, __u64); -} ipfix_probe_map SEC(".maps"); +} packet_probe_perf SEC(".maps"); #endif diff --git a/src/xdp_struct.h b/src/xdp_struct.h index 2165448..99fbbc3 100644 --- a/src/xdp_struct.h +++ b/src/xdp_struct.h @@ -1,3 +1,4 @@ +/* SPDX-License-Identifier: (GPL-2.0-only OR MIT) */ /* * Copyright (c) 2023 NTT Communications Corporation * Copyright (c) 2023 Takeru Hayasaka @@ -25,21 +26,11 @@ struct srhhdr struct in6_addr segments[0]; }; -struct probe_data +struct metadata { - __u8 h_dest[ETH_ALEN]; - __u8 h_source[ETH_ALEN]; - __be16 h_proto; - struct in6_addr v6_srcaddr; - struct in6_addr v6_dstaddr; - __u8 nextHdr; - __u8 hdrExtLen; - __u8 routingType; - __u8 segmentsLeft; - __u8 lastEntry; - __u8 flags; - __u16 tag; - struct in6_addr segments[MAX_SEGMENTLIST_ENTRIES]; + __u64 received_nanosecond; + __u32 sent_second; + __u32 sent_subsecond; }; #endif diff --git a/tools/exporter/exporter.go b/tools/exporter/exporter.go index 300aa26..48d9f22 100644 --- a/tools/exporter/exporter.go +++ b/tools/exporter/exporter.go @@ -11,7 +11,7 @@ import ( "net/netip" "github.com/nttcom/fluvia/pkg/client" - "github.com/nttcom/fluvia/pkg/packet/ipfix" + "github.com/nttcom/fluvia/pkg/ipfix" ) func main() {