Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat/input/udp: add support for /proc/net/udp metrics #34070

Merged
merged 1 commit into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030]
- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]
- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070]

*Auditbeat*

Expand Down
2 changes: 2 additions & 0 deletions filebeat/docs/inputs/input-udp.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ observe the activity of the input.
| `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge).
| `received_events_total` | Total number of packets (events) that have been received.
| `received_bytes_total` | Total number of bytes received.
| `receive_queue_length` | Size of the system receive queue (linux only) (guage).
| `system_packet_drops` | Number of system packet drops (linux only) (guage).
| `arrival_period` | Histogram of the time between successive packets in nanoseconds.
| `processing_time` | Histogram of the time taken to process packets in nanoseconds.
|=======
Expand Down
117 changes: 114 additions & 3 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@
package udp

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"net"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/dustin/go-humanize"
Expand All @@ -28,10 +36,11 @@ import (
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat" // TODO: Replace with sync/atomic when go1.19 is supported.
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
Expand Down Expand Up @@ -96,7 +105,8 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log.Info("starting udp socket input")
defer log.Info("udp input stopped")

metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer))
const pollInterval = time.Minute
metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer), pollInterval, log)
defer metrics.close()

server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) {
Expand Down Expand Up @@ -137,20 +147,23 @@ func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
// inputMetrics handles the input's metric reporting.
type inputMetrics struct {
unregister func()
done chan struct{}

lastPacket time.Time

device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
bufferLen *monitoring.Uint // configured read buffer length
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/udp (only on linux systems)
drops *monitoring.Uint // number of udp drops noted in /proc/net/udp
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

// newInputMetrics returns an input metric for the UDP processor. If id is empty
// a nil inputMetric is returned.
func newInputMetrics(id, device string, buflen uint64) *inputMetrics {
func newInputMetrics(id, device string, buflen uint64, poll time.Duration, log *logp.Logger) *inputMetrics {
if id == "" {
return nil
}
Expand All @@ -161,6 +174,8 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics {
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
drops: monitoring.NewUint(reg, "system_packet_drops"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
Expand All @@ -172,6 +187,35 @@ func newInputMetrics(id, device string, buflen uint64) *inputMetrics {
out.device.Set(device)
out.bufferLen.Set(buflen)

if poll > 0 && runtime.GOOS == "linux" {
host, port, ok := strings.Cut(device, ":")
if !ok {
log.Warnf("failed to get address for %s: no port separator", device)
return out
}
ip, err := net.LookupIP(host)
if err != nil {
log.Warnf("failed to get address for %s: %v", device, err)
return out
}
p, err := strconv.ParseInt(port, 10, 16)
if err != nil {
log.Warnf("failed to get port for %s: %v", device, err)
return out
}
ph := strconv.FormatInt(p, 16)
addr := make([]string, 0, len(ip))
for _, p := range ip {
p4 := p.To4()
if len(p4) != net.IPv4len {
continue
}
addr = append(addr, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p4), ph))
}
out.done = make(chan struct{})
go out.poll(addr, poll, log)
}

return out
}

Expand All @@ -189,9 +233,76 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {
m.lastPacket = timestamp
}

// poll periodically gets UDP buffer and packet drops stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
t := time.NewTicker(each)
for {
select {
case <-t.C:
rx, drops, err := procNetUDP(addr)
if err != nil {
log.Warnf("failed to get udp stats from /proc: %v", err)
continue
}
m.rxQueue.Set(uint64(rx))
m.drops.Set(uint64(drops))
case <-m.done:
t.Stop()
return
}
}
}

// procNetUDP returns the rx_queue and drops field of the UDP socket table
// for the socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity.
func procNetUDP(addr []string) (rx, drops int64, err error) {
b, err := os.ReadFile("/proc/net/udp")
if err != nil {
return 0, 0, err
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s (no line)", addr)
}
for _, l := range lines[1:] {
f := bytes.Fields(l)
if contains(f[1], addr) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, 0, errors.New("no rx_queue field " + string(f[4]))
}
rx, err = strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
drops, err = strconv.ParseInt(string(f[12]), 16, 64)
if err != nil {
return 0, 0, fmt.Errorf("failed to parse drops: %w", err)
}
return rx, drops, nil
}
}
return 0, 0, fmt.Errorf("/proc/net/udp entry not found for %s", addr)
}

func contains(b []byte, addr []string) bool {
for _, a := range addr {
if strings.EqualFold(string(b), a) {
return true
}
}
return false
}

func (m *inputMetrics) close() {
if m == nil {
return
}
if m.done != nil {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}
m.unregister()
}