Skip to content

Commit

Permalink
packetbeat/{beater,protos/tcp,protos/udp,sniffer}: add initial metric…
Browse files Browse the repository at this point in the history
…s for TCP/UDP traffic (#33833)

This adds metrics for TCP/UDP packet count and total bytes, and histograms
for time required to process TCP/UDP packets prior to acking from a
publication and time between TCP/UDP packet arrivals.
  • Loading branch information
efd6 authored and chrisberkhout committed Jun 1, 2023
1 parent 4acc3c6 commit 2c9abc8
Show file tree
Hide file tree
Showing 9 changed files with 434 additions and 207 deletions.
7 changes: 1 addition & 6 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,6 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add support for polling system UDP stats for UDP input metrics. {pull}34070[34070]
- Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159]

*Auditbeat*


*Filebeat*


*Heartbeat*

- Add new states field for internal use by new synthetics app. {pull}30632[30632]
Expand All @@ -223,6 +217,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Bump Windows Npcap version to v1.71. {issue}33164[33164] {pull}33172[33172]
- Add fragmented IPv4 packet reassembly. {issue}33012[33012] {pull}33296[33296]
- Reduce logging level for ENOENT to WARN when mapping sockets to processes. {issue}33793[33793] {pull}[]
- Add metrics for TCP and UDP packet processing. {pull}33833[33833]

*Functionbeat*

Expand Down
9 changes: 9 additions & 0 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package beater

import (
"flag"
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/service"
Expand Down Expand Up @@ -124,6 +126,13 @@ func (pb *packetbeat) Run(b *beat.Beat) error {
}
}()

if b.API != nil {
err := inputmon.AttachHandler(b.API.Router())
if err != nil {
return fmt.Errorf("failed attach inputs api to monitoring endpoint server: %w", err)
}
}

if !b.Manager.Enabled() {
return pb.runStatic(b, pb.factory)
}
Expand Down
30 changes: 29 additions & 1 deletion packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/processors"
Expand Down Expand Up @@ -116,6 +118,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
logp.Err("Failed to read the beat config: %v, %v", err, config)
return nil, err
}
id, err := configID(cfg)
if err != nil {
logp.Err("Failed to generate ID from config: %v, %v", err, config)
return nil, err
}

publisher, err := publish.NewTransactionPublisher(
p.beat.Info.Name,
Expand Down Expand Up @@ -150,7 +157,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
if err != nil {
return nil, err
}
sniffer, err := setupSniffer(config, protocols, sniffer.DecodersFor(publisher, protocols, watcher, flows, config))
sniffer, err := setupSniffer(config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,3 +223,24 @@ func (p *processorFactory) CheckConfig(config *conf.C) error {
runner.Stop()
return nil
}

func configID(config *conf.C) (string, error) {
var tmp struct {
ID string `config:"id"`
}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
_ = config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute ID from configuration: %w", err)
}

return fmt.Sprintf("%16X", id), nil
}
Loading

0 comments on commit 2c9abc8

Please sign in to comment.