diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 8fbcaa8e0d9..f5f1c37165c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -184,6 +184,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030] - Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044] - Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014] +- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070] *Auditbeat* diff --git a/filebeat/docs/inputs/input-udp.asciidoc b/filebeat/docs/inputs/input-udp.asciidoc index 6f88b775807..efd75a33593 100644 --- a/filebeat/docs/inputs/input-udp.asciidoc +++ b/filebeat/docs/inputs/input-udp.asciidoc @@ -41,6 +41,8 @@ observe the activity of the input. | `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge). | `received_events_total` | Total number of packets (events) that have been received. | `received_bytes_total` | Total number of bytes received. +| `receive_queue_length` | Size of the system receive queue (linux only) (guage). +| `system_packet_drops` | Number of system packet drops (linux only) (guage). | `arrival_period` | Histogram of the time between successive packets in nanoseconds. | `processing_time` | Histogram of the time taken to process packets in nanoseconds. |======= diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 4f58f8a0856..77e57820ea4 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -18,7 +18,15 @@ package udp import ( + "bytes" + "encoding/binary" + "errors" + "fmt" "net" + "os" + "runtime" + "strconv" + "strings" "time" "github.com/dustin/go-humanize" @@ -28,10 +36,11 @@ import ( stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" - "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat" // TODO: Replace with sync/atomic when go1.19 is supported. "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" @@ -96,7 +105,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { log.Info("starting udp socket input") defer log.Info("udp input stopped") - metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer)) + const pollInterval = time.Minute + metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log) defer metrics.close() server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) { @@ -137,6 +147,7 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { // inputMetrics handles the input's metric reporting. type inputMetrics struct { unregister func() + done chan struct{} lastPacket time.Time @@ -144,13 +155,15 @@ type inputMetrics struct { packets *monitoring.Uint // number of packets processed bytes *monitoring.Uint // number of bytes processed bufferLen *monitoring.Uint // configured read buffer length + rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp (only on linux systems) + drops *monitoring.Uint // number of udp drops noted in /proc/net/udp arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication } // newInputMetrics returns an input metric for the UDP processor. If id is empty // a nil inputMetric is returned. -func newInputMetrics(id, device string, buflen uint64) *inputMetrics { +func newInputMetrics(id, device string, buflen uint64, poll time.Duration, log *logp.Logger) *inputMetrics { if id == "" { return nil } @@ -161,6 +174,8 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics { device: monitoring.NewString(reg, "device"), packets: monitoring.NewUint(reg, "received_events_total"), bytes: monitoring.NewUint(reg, "received_bytes_total"), + rxQueue: monitoring.NewUint(reg, "receive_queue_length"), + drops: monitoring.NewUint(reg, "system_packet_drops"), arrivalPeriod: metrics.NewUniformSample(1024), processingTime: metrics.NewUniformSample(1024), } @@ -172,6 +187,35 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics { out.device.Set(device) out.bufferLen.Set(buflen) + if poll > 0 && runtime.GOOS == "linux" { + host, port, ok := strings.Cut(device, ":") + if !ok { + log.Warnf("failed to get address for %s: no port separator", device) + return out + } + ip, err := net.LookupIP(host) + if err != nil { + log.Warnf("failed to get address for %s: %v", device, err) + return out + } + p, err := strconv.ParseInt(port, 10, 16) + if err != nil { + log.Warnf("failed to get port for %s: %v", device, err) + return out + } + ph := strconv.FormatInt(p, 16) + addr := make([]string, 0, len(ip)) + for _, p := range ip { + p4 := p.To4() + if len(p4) != net.IPv4len { + continue + } + addr = append(addr, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p4), ph)) + } + out.done = make(chan struct{}) + go out.poll(addr, poll, log) + } + return out } @@ -189,9 +233,76 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) { m.lastPacket = timestamp } +// poll periodically gets UDP buffer and packet drops stats from the OS. +func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) { + t := time.NewTicker(each) + for { + select { + case <-t.C: + rx, drops, err := procNetUDP(addr) + if err != nil { + log.Warnf("failed to get udp stats from /proc: %v", err) + continue + } + m.rxQueue.Set(uint64(rx)) + m.drops.Set(uint64(drops)) + case <-m.done: + t.Stop() + return + } + } +} + +// procNetUDP returns the rx_queue and drops field of the UDP socket table +// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx. +// This function is only useful on linux due to its dependence on the /proc +// filesystem, but is kept in this file for simplicity. +func procNetUDP(addr []string) (rx, drops int64, err error) { + b, err := os.ReadFile("/proc/net/udp") + if err != nil { + return 0, 0, err + } + lines := bytes.Split(b, []byte("\n")) + if len(lines) < 2 { + return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s (no line)", addr) + } + for _, l := range lines[1:] { + f := bytes.Fields(l) + if contains(f[1], addr) { + _, r, ok := bytes.Cut(f[4], []byte(":")) + if !ok { + return 0, 0, errors.New("no rx_queue field " + string(f[4])) + } + rx, err = strconv.ParseInt(string(r), 16, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err) + } + drops, err = strconv.ParseInt(string(f[12]), 16, 64) + if err != nil { + return 0, 0, fmt.Errorf("failed to parse drops: %w", err) + } + return rx, drops, nil + } + } + return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s", addr) +} + +func contains(b []byte, addr []string) bool { + for _, a := range addr { + if strings.EqualFold(string(b), a) { + return true + } + } + return false +} + func (m *inputMetrics) close() { if m == nil { return } + if m.done != nil { + // Shut down poller and wait until done before unregistering metrics. + m.done <- struct{}{} + } m.unregister() }