Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

packetbeat/{beater,protos/tcp,protos/udp,sniffer}: add initial metrics for TCP/UDP traffic #33833

Merged
merged 10 commits into from
Jan 10, 2023
8 changes: 2 additions & 6 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,6 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712]
- Add `decode_duration`, `move_fields` processors. {pull}31301[31301]

*Auditbeat*


*Filebeat*


*Heartbeat*

- Add new states field for internal use by new synthetics app. {pull}30632[30632]
Expand All @@ -199,6 +193,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Bump Windows Npcap version to v1.71. {issue}33164[33164] {pull}33172[33172]
- Add fragmented IPv4 packet reassembly. {issue}33012[33012] {pull}33296[33296]
- Reduce logging level for ENOENT to WARN when mapping sockets to processes. {issue}33793[33793] {pull}[]
- Add metrics for UDP packet processing. {pull}33833[33833]
- Add metrics for TCP and UDP packet processing. {pull}33833[33833]

*Functionbeat*

Expand Down
30 changes: 29 additions & 1 deletion packetbeat/beater/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/processors"
Expand Down Expand Up @@ -116,6 +118,11 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
logp.Err("Failed to read the beat config: %v, %v", err, config)
return nil, err
}
id, err := configID(cfg)
if err != nil {
logp.Err("Failed to generate ID from config: %v, %v", err, config)
return nil, err
}

publisher, err := publish.NewTransactionPublisher(
p.beat.Info.Name,
Expand Down Expand Up @@ -150,7 +157,7 @@ func (p *processorFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C)
if err != nil {
return nil, err
}
sniffer, err := setupSniffer(config, protocols, sniffer.DecodersFor(publisher, protocols, watcher, flows, config))
sniffer, err := setupSniffer(config, protocols, sniffer.DecodersFor(id, publisher, protocols, watcher, flows, config))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,3 +223,24 @@ func (p *processorFactory) CheckConfig(config *conf.C) error {
runner.Stop()
return nil
}

func configID(config *conf.C) (string, error) {
var tmp struct {
ID string `config:"id"`
}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
_ = config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute ID from configuration: %w", err)
}

return fmt.Sprintf("%16X", id), nil
}
Loading