Skip to content

Commit

Permalink
filebeat/input/udp: add support for /proc/net/udp metrics (#34070)
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 authored and chrisberkhout committed Jun 1, 2023
1 parent bc2b97f commit 69fa5f2
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030]
- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]
- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070]

*Auditbeat*

Expand Down
2 changes: 2 additions & 0 deletions filebeat/docs/inputs/input-udp.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ observe the activity of the input.
| `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge).
| `received_events_total` | Total number of packets (events) that have been received.
| `received_bytes_total` | Total number of bytes received.
| `receive_queue_length` | Size of the system receive queue (linux only) (guage).
| `system_packet_drops` | Number of system packet drops (linux only) (guage).
| `arrival_period` | Histogram of the time between successive packets in nanoseconds.
| `processing_time` | Histogram of the time taken to process packets in nanoseconds.
|=======
Expand Down
117 changes: 114 additions & 3 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
package udp

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -28,10 +36,11 @@ import (
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat" // TODO: Replace with sync/atomic when go1.19 is supported.
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
Expand Down Expand Up @@ -96,7 +105,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log.Info("starting udp socket input")
defer log.Info("udp input stopped")

metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer))
const pollInterval = time.Minute
metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log)
defer metrics.close()

server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) {
Expand Down Expand Up @@ -137,20 +147,23 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
done chan struct{}

lastPacket time.Time

device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
bufferLen *monitoring.Uint // configured read buffer length
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp (only on linux systems)
drops *monitoring.Uint // number of udp drops noted in /proc/net/udp
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

// newInputMetrics returns an input metric for the UDP processor. If id is empty
// a nil inputMetric is returned.
func newInputMetrics(id, device string, buflen uint64) *inputMetrics {
func newInputMetrics(id, device string, buflen uint64, poll time.Duration, log *logp.Logger) *inputMetrics {
if id == "" {
return nil
}
Expand All @@ -161,6 +174,8 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics {
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
drops: monitoring.NewUint(reg, "system_packet_drops"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
Expand All @@ -172,6 +187,35 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics {
out.device.Set(device)
out.bufferLen.Set(buflen)

if poll > 0 && runtime.GOOS == "linux" {
host, port, ok := strings.Cut(device, ":")
if !ok {
log.Warnf("failed to get address for %s: no port separator", device)
return out
}
ip, err := net.LookupIP(host)
if err != nil {
log.Warnf("failed to get address for %s: %v", device, err)
return out
}
p, err := strconv.ParseInt(port, 10, 16)
if err != nil {
log.Warnf("failed to get port for %s: %v", device, err)
return out
}
ph := strconv.FormatInt(p, 16)
addr := make([]string, 0, len(ip))
for _, p := range ip {
p4 := p.To4()
if len(p4) != net.IPv4len {
continue
}
addr = append(addr, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p4), ph))
}
out.done = make(chan struct{})
go out.poll(addr, poll, log)
}

return out
}

Expand All @@ -189,9 +233,76 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {
m.lastPacket = timestamp
}

// poll periodically gets UDP buffer and packet drops stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
t := time.NewTicker(each)
for {
select {
case <-t.C:
rx, drops, err := procNetUDP(addr)
if err != nil {
log.Warnf("failed to get udp stats from /proc: %v", err)
continue
}
m.rxQueue.Set(uint64(rx))
m.drops.Set(uint64(drops))
case <-m.done:
t.Stop()
return
}
}
}

// procNetUDP returns the rx_queue and drops field of the UDP socket table
// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetUDP(addr []string) (rx, drops int64, err error) {
b, err := os.ReadFile("/proc/net/udp")
if err != nil {
return 0, 0, err
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s (no line)", addr)
}
for _, l := range lines[1:] {
f := bytes.Fields(l)
if contains(f[1], addr) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, 0, errors.New("no rx_queue field " + string(f[4]))
}
rx, err = strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
drops, err = strconv.ParseInt(string(f[12]), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse drops: %w", err)
}
return rx, drops, nil
}
}
return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s", addr)
}

func contains(b []byte, addr []string) bool {
for _, a := range addr {
if strings.EqualFold(string(b), a) {
return true
}
}
return false
}

func (m *inputMetrics) close() {
if m == nil {
return
}
if m.done != nil {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}
m.unregister()
}

0 comments on commit 69fa5f2

Please sign in to comment.