Skip to content

Commit

Permalink
filebeat/input/udp: convert to v2 input
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Dec 4, 2022
1 parent 97a76bc commit 3e3f3b3
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 174 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712]
- Add `decode_duration`, `move_fields` processors. {pull}31301[31301]
- Add metrics for UDP packet processing. {pull}33870[33870]
- Convert UDP input to v2 input. {pull}33930[33930]

*Auditbeat*

Expand Down
1 change: 0 additions & 1 deletion filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/kafka"
"github.com/elastic/beats/v7/filebeat/input/udp"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -38,6 +39,7 @@ func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin {
return []v2.Plugin{
filestream.Plugin(log, components),
kafka.Plugin(),
udp.Plugin(),
unix.Plugin(),
}
}
45 changes: 0 additions & 45 deletions filebeat/input/udp/config.go

This file was deleted.

161 changes: 68 additions & 93 deletions filebeat/input/udp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,90 @@
package udp

import (
"fmt"
"sync"
"net"
"time"

"github.com/mitchellh/hashstructure"
"github.com/dustin/go-humanize"
"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
input "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/filebeat/inputsource"
"github.com/elastic/beats/v7/filebeat/inputsource/udp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/go-concert/ctxtool"
)

func init() {
err := input.Register("udp", NewInput)
if err != nil {
panic(err)
func Plugin() input.Plugin {
return input.Plugin{
Name: "udp",
Stability: feature.Stable,
Deprecated: false,
Info: "udp packet server",
Manager: stateless.NewInputManager(configure),
}
}

// Input defines a udp input to receive event on a specific host:port.
type Input struct {
sync.Mutex
udp *udp.Server
started bool
outlet channel.Outleter
func configure(cfg *conf.C) (stateless.Input, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

metrics *inputMetrics
return newServer(config)
}

// NewInput creates a new udp input
func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (input.Input, error) {
id, err := configID(cfg)
if err != nil {
return nil, err
func defaultConfig() config {
return config{
Config: udp.Config{
MaxMessageSize: 10 * humanize.KiByte,
// TODO: What should be default port?
Host: "localhost:8080",
// TODO: What should be the default timeout?
Timeout: time.Minute * 5,
},
}
out, err := outlet.Connect(cfg)
}

type server struct {
udp.Server
config
}

type config struct {
udp.Config `config:",inline"`
}

func newServer(config config) (*server, error) {
return &server{config: config}, nil
}

func (s *server) Name() string { return "udp" }

func (s *server) Test(_ input.TestContext) error {
l, err := net.Listen("udp", s.config.Config.Host)
if err != nil {
return nil, err
return err
}
return l.Close()
}

config := defaultConfig
if err = cfg.Unpack(&config); err != nil {
return nil, err
}
func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error {
log := ctx.Logger.Named("udp").With("host", s.config.Config.Host)

log.Info("starting udp socket input")
defer log.Info("udp input stopped")

metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer))
defer metrics.close()

forwarder := harvester.NewForwarder(out)
metrics := newInputMetrics(id, config.Host, uint64(config.ReadBuffer))
callback := func(data []byte, metadata inputsource.NetworkMetadata) {
server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) {
evt := beat.Event{
Timestamp: time.Now(),
Meta: mapstr.M{
Expand All @@ -91,74 +118,22 @@ func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (inp
},
}
}
_ = forwarder.Send(evt)

publisher.Publish(evt)

// This must be called after forwarder.Send to measure
// the processing time metric.
metrics.log(data, evt.Timestamp)
}

udp := udp.New(&config.Config, callback)

return &Input{
outlet: out,
udp: udp,
started: false,
metrics: metrics,
}, nil
}

func configID(config *conf.C) (string, error) {
var tmp struct {
ID string `config:"id"`
}
if err := config.Unpack(&tmp); err != nil {
return "", fmt.Errorf("error extracting ID: %w", err)
}
if tmp.ID != "" {
return tmp.ID, nil
}

var h map[string]interface{}
_ = config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return "", fmt.Errorf("can not compute ID from configuration: %w", err)
}
})

return fmt.Sprintf("%16X", id), nil
}
log.Debugf("udp input '%v' initialized", ctx.ID)

// Run starts and start the UDP server and read events from the socket
func (p *Input) Run() {
p.Lock()
defer p.Unlock()

if !p.started {
logp.Info("Starting UDP input")
err := p.udp.Start()
if err != nil {
logp.Err("Error running harvester: %v", err)
}
p.started = true
err := server.Run(ctxtool.FromCanceller(ctx.Cancelation))
// Ignore error from 'Run' in case shutdown was signaled.
if ctxerr := ctx.Cancelation.Err(); ctxerr != nil {
err = ctxerr
}
}

// Stop stops the UDP input
func (p *Input) Stop() {
defer p.outlet.Close()
p.Lock()
defer p.Unlock()

logp.Info("Stopping UDP input")
p.udp.Stop()
p.metrics.close()
p.started = false
}

// Wait suspends the UDP input
func (p *Input) Wait() {
p.Stop()
return err
}

// inputMetrics handles the input's metric reporting.
Expand Down
35 changes: 0 additions & 35 deletions filebeat/input/udp/input_test.go

This file was deleted.

0 comments on commit 3e3f3b3

Please sign in to comment.