Skip to content

Commit

Permalink
NETOBSERV-588 Add IPv4/6 DSCP field to the exported flow (#158)
Browse files Browse the repository at this point in the history
Signed-off-by: msherif1234 <[email protected]>
  • Loading branch information
msherif1234 authored Oct 5, 2023
1 parent f3f9a60 commit ca897d5
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 64 deletions.
5 changes: 3 additions & 2 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;

aggregate_flow->dscp = pkt.dscp;
// Does not matter the gate. Will be zero if not enabled.
if (pkt.rtt > aggregate_flow->flow_rtt) {
aggregate_flow->flow_rtt = pkt.rtt;
Expand All @@ -109,7 +109,8 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
.flow_rtt = pkt.rtt
.flow_rtt = pkt.rtt,
.dscp = pkt.dscp,
};

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
Expand Down
4 changes: 4 additions & 0 deletions bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ typedef __u64 u64;
#define ETH_P_IPV6 0x86DD
#define ETH_P_ARP 0x0806
#define IPPROTO_ICMPV6 58
#define DSCP_SHIFT 2
#define DSCP_MASK 0x3F

// according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum {
Expand All @@ -75,6 +77,7 @@ typedef struct flow_metrics_t {
// 0 otherwise
// https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
u8 errno;
u8 dscp;
struct pkt_drops_t {
u32 packets;
u64 bytes;
Expand Down Expand Up @@ -155,6 +158,7 @@ typedef struct pkt_info_t {
u16 flags; // TCP specific
void *l4_hdr; // Stores the actual l4 header
u64 rtt; // rtt calculated from the flow if possible. else zero
u8 dscp; // IPv4/6 DSCP value
} pkt_info;

// Structure for payload metadata
Expand Down
12 changes: 10 additions & 2 deletions bpf/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ static inline void fill_l4info(void *l4_hdr_start, void *data_end, u8 protocol,
}
}

static inline u8 ipv4_get_dscp(const struct iphdr *iph) {
return (iph->tos >> DSCP_SHIFT) & DSCP_MASK;
}

static inline u8 ipv6_get_dscp(const struct ipv6hdr *ipv6h) {
return ((bpf_ntohs(*(const __be16 *)ipv6h) >> 4) >> DSCP_SHIFT) & DSCP_MASK;
}

// sets flow fields from IPv4 header information
static inline int fill_iphdr(struct iphdr *ip, void *data_end, pkt_info *pkt) {
void *l4_hdr_start;
Expand All @@ -100,7 +108,7 @@ static inline int fill_iphdr(struct iphdr *ip, void *data_end, pkt_info *pkt) {
__builtin_memcpy(id->dst_ip, ip4in6, sizeof(ip4in6));
__builtin_memcpy(id->src_ip + sizeof(ip4in6), &ip->saddr, sizeof(ip->saddr));
__builtin_memcpy(id->dst_ip + sizeof(ip4in6), &ip->daddr, sizeof(ip->daddr));

pkt->dscp = ipv4_get_dscp(ip);
/* fill l4 header which will be added to id in flow_monitor function.*/
fill_l4info(l4_hdr_start, data_end, ip->protocol, pkt);
return SUBMIT;
Expand All @@ -118,7 +126,7 @@ static inline int fill_ip6hdr(struct ipv6hdr *ip, void *data_end, pkt_info *pkt)
/* Save the IP Address to id directly. copy once. */
__builtin_memcpy(id->src_ip, ip->saddr.in6_u.u6_addr8, IP_MAX_LEN);
__builtin_memcpy(id->dst_ip, ip->daddr.in6_u.u6_addr8, IP_MAX_LEN);

pkt->dscp = ipv6_get_dscp(ip);
/* fill l4 header which will be added to id in flow_monitor function.*/
fill_l4info(l4_hdr_start, data_end, ip->nexthdr, pkt);
return SUBMIT;
Expand Down
6 changes: 4 additions & 2 deletions examples/flowlogs-dump/server/flowlogs-dump-collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,15 @@ func main() {
for records := range receivedRecords {
for _, record := range records.Entries {
if record.EthProtocol == ipv6 {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: dscp: 0x%x protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
net.IP(record.Network.GetSrcAddr().GetIpv6()).To16(),
record.Transport.SrcPort,
net.IP(record.Network.GetDstAddr().GetIpv6()).To16(),
record.Transport.DstPort,
record.Network.GetDscp(),
protocolByNumber[record.Transport.Protocol],
record.IcmpType,
record.IcmpCode,
Expand All @@ -97,14 +98,15 @@ func main() {
record.GetPktDropLatestDropCause(),
)
} else {
log.Printf("%s: %v %s IP %s:%d > %s:%d: protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
log.Printf("%s: %v %s IP %s:%d > %s:%d: dscp: 0x%x protocol:%s type: %d code: %d dir:%d bytes:%d packets:%d flags:%d ends: %v dnsId: %d dnsFlags: 0x%04x dnsLatency(ms): %v rtt(ns) %v DropPkts: %d DropBytes: %d DropCause %d\n",
ipProto[record.EthProtocol],
record.TimeFlowStart.AsTime().Local().Format("15:04:05.000000"),
record.Interface,
ipIntToNetIP(record.Network.GetSrcAddr().GetIpv4()).String(),
record.Transport.SrcPort,
ipIntToNetIP(record.Network.GetDstAddr().GetIpv4()).String(),
record.Transport.DstPort,
record.Network.GetDscp(),
protocolByNumber[record.Transport.Protocol],
record.IcmpType,
record.IcmpCode,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
github.com/cilium/ebpf v0.11.0
github.com/fsnotify/fsnotify v1.6.0
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424
github.com/golang/protobuf v1.5.3
github.com/google/gopacket v1.1.19
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
Expand Down Expand Up @@ -41,6 +40,7 @@ require (
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
out["SrcAddr"] = ipToStr(flow.Network.GetSrcAddr())
out["DstAddr"] = ipToStr(flow.Network.GetDstAddr())
out["Proto"] = flow.Transport.GetProtocol()
out["Dscp"] = flow.Network.GetDscp()
proto := flow.Transport.GetProtocol()
if proto == syscall.IPPROTO_ICMP || proto == syscall.IPPROTO_ICMPV6 {
out["IcmpType"] = flow.GetIcmpType()
Expand Down
12 changes: 12 additions & 0 deletions pkg/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -61,6 +62,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -93,6 +95,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -112,6 +115,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -143,6 +147,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -162,6 +167,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -193,6 +199,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv6{Ipv6: []byte{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26}},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand All @@ -212,6 +219,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "102:304:506:708:90a:b0c:d0e:f10",
"DstAddr": "b0c:d0e:f10:1112:1314:1516:1718:191a",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -288,6 +296,7 @@ func TestDecodeProtobuf(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand Down Expand Up @@ -317,6 +326,7 @@ func TestDecodeProtobuf(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"Duplicate": false,
Expand Down Expand Up @@ -374,6 +384,7 @@ func TestPBFlowToMap(t *testing.T) {
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 64,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
Expand Down Expand Up @@ -407,6 +418,7 @@ func TestPBFlowToMap(t *testing.T) {
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": uint32(64),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint32(23000),
Expand Down
1 change: 1 addition & 0 deletions pkg/ebpf/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfeb.o
Binary file not shown.
1 change: 1 addition & 0 deletions pkg/ebpf/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_bpfel.o
Binary file not shown.
2 changes: 2 additions & 0 deletions pkg/exporter/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func v4FlowToPB(fr *flow.Record) *pbflow.Record {
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: flow.IntEncodeV4(fr.Id.SrcIp)}},
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv4{Ipv4: flow.IntEncodeV4(fr.Id.DstIp)}},
Dscp: uint32(fr.Metrics.Dscp),
},
Transport: &pbflow.Transport{
Protocol: uint32(fr.Id.TransportProtocol),
Expand Down Expand Up @@ -97,6 +98,7 @@ func v6FlowToPB(fr *flow.Record) *pbflow.Record {
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: fr.Id.SrcIp[:]}},
DstAddr: &pbflow.IP{IpFamily: &pbflow.IP_Ipv6{Ipv6: fr.Id.DstIp[:]}},
Dscp: uint32(fr.Metrics.Dscp),
},
Transport: &pbflow.Transport{
Protocol: uint32(fr.Id.TransportProtocol),
Expand Down
4 changes: 4 additions & 0 deletions pkg/flow/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func Accumulate(r *ebpf.BpfFlowMetrics, src *ebpf.BpfFlowMetrics) {
if r.FlowRtt < src.FlowRtt {
r.FlowRtt = src.FlowRtt
}
// Accumulate DSCP
if src.Dscp != 0 {
r.Dscp = src.Dscp
}
}

// IP returns the net.IP equivalent object
Expand Down
2 changes: 2 additions & 0 deletions pkg/flow/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, // u64 flow_end_time
0x13, 0x14, //flags
0x33, // u8 errno
0x60, // u8 dscp
// pkt_drops structure
0x10, 0x11, 0x12, 0x13, // u32 packets
0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, // u64 bytes
Expand Down Expand Up @@ -69,6 +70,7 @@ func TestRecordBinaryEncoding(t *testing.T) {
EndMonoTimeTs: 0x1a19181716151413,
Flags: 0x1413,
Errno: 0x33,
Dscp: 0x60,
PktDrops: ebpf.BpfPktDropsT{
Packets: 0x13121110,
Bytes: 0x1b1a191817161514,
Expand Down
Loading

0 comments on commit ca897d5

Please sign in to comment.