diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1fb0d2fcfe1e..71ae521f9142 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -168,6 +168,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712] - Add `decode_duration`, `move_fields` processors. {pull}31301[31301] - Add metrics for UDP packet processing. {pull}33870[33870] +- Convert UDP input to v2 input. {pull}33930[33930] *Auditbeat* diff --git a/filebeat/include/list.go b/filebeat/include/list.go index 30f56f9e7248..eda7cb7bf67a 100644 --- a/filebeat/include/list.go +++ b/filebeat/include/list.go @@ -28,7 +28,6 @@ import ( _ "github.com/elastic/beats/v7/filebeat/input/stdin" _ "github.com/elastic/beats/v7/filebeat/input/syslog" _ "github.com/elastic/beats/v7/filebeat/input/tcp" - _ "github.com/elastic/beats/v7/filebeat/input/udp" _ "github.com/elastic/beats/v7/filebeat/module/apache" _ "github.com/elastic/beats/v7/filebeat/module/auditd" _ "github.com/elastic/beats/v7/filebeat/module/elasticsearch" diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go index bfd9ec2f0636..fb580179bb91 100644 --- a/filebeat/input/default-inputs/inputs.go +++ b/filebeat/input/default-inputs/inputs.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/filebeat/input/kafka" + "github.com/elastic/beats/v7/filebeat/input/udp" "github.com/elastic/beats/v7/filebeat/input/unix" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" @@ -38,6 +39,7 @@ func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin { return []v2.Plugin{ filestream.Plugin(log, components), kafka.Plugin(), + udp.Plugin(), unix.Plugin(), } } diff --git a/filebeat/input/udp/config.go b/filebeat/input/udp/config.go deleted file mode 100644 index 33696dc69c72..000000000000 --- a/filebeat/input/udp/config.go +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package udp - -import ( - "time" - - "github.com/dustin/go-humanize" - - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/inputsource/udp" -) - -var defaultConfig = config{ - ForwarderConfig: harvester.ForwarderConfig{ - Type: "udp", - }, - Config: udp.Config{ - MaxMessageSize: 10 * humanize.KiByte, - // TODO: What should be default port? - Host: "localhost:8080", - // TODO: What should be the default timeout? - Timeout: time.Minute * 5, - }, -} - -type config struct { - udp.Config `config:",inline"` - harvester.ForwarderConfig `config:",inline"` -} diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index e736d602100c..4f58f8a0856e 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -18,63 +18,88 @@ package udp import ( - "fmt" - "sync" + "net" "time" - "github.com/mitchellh/hashstructure" + "github.com/dustin/go-humanize" "github.com/rcrowley/go-metrics" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" + input "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/monitoring/adapter" + "github.com/elastic/go-concert/ctxtool" ) -func init() { - err := input.Register("udp", NewInput) - if err != nil { - panic(err) +func Plugin() input.Plugin { + return input.Plugin{ + Name: "udp", + Stability: feature.Stable, + Deprecated: false, + Info: "udp packet server", + Manager: stateless.NewInputManager(configure), } } -// Input defines a udp input to receive event on a specific host:port. -type Input struct { - sync.Mutex - udp *udp.Server - started bool - outlet channel.Outleter +func configure(cfg *conf.C) (stateless.Input, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, err + } - metrics *inputMetrics + return newServer(config) } -// NewInput creates a new udp input -func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (input.Input, error) { - id, err := configID(cfg) - if err != nil { - return nil, err +func defaultConfig() config { + return config{ + Config: udp.Config{ + MaxMessageSize: 10 * humanize.KiByte, + Host: "localhost:8080", + Timeout: time.Minute * 5, + }, } - out, err := outlet.Connect(cfg) +} + +type server struct { + udp.Server + config +} + +type config struct { + udp.Config `config:",inline"` +} + +func newServer(config config) (*server, error) { + return &server{config: config}, nil +} + +func (s *server) Name() string { return "udp" } + +func (s *server) Test(_ input.TestContext) error { + l, err := net.Listen("udp", s.config.Config.Host) if err != nil { - return nil, err + return err } + return l.Close() +} - config := defaultConfig - if err = cfg.Unpack(&config); err != nil { - return nil, err - } +func (s *server) Run(ctx input.Context, publisher stateless.Publisher) error { + log := ctx.Logger.With("host", s.config.Config.Host) + + log.Info("starting udp socket input") + defer log.Info("udp input stopped") + + metrics := newInputMetrics(ctx.ID, s.config.Host, uint64(s.config.ReadBuffer)) + defer metrics.close() - forwarder := harvester.NewForwarder(out) - metrics := newInputMetrics(id, config.Host, uint64(config.ReadBuffer)) - callback := func(data []byte, metadata inputsource.NetworkMetadata) { + server := udp.New(&s.config.Config, func(data []byte, metadata inputsource.NetworkMetadata) { evt := beat.Event{ Timestamp: time.Now(), Meta: mapstr.M{ @@ -91,74 +116,22 @@ func NewInput(cfg *conf.C, outlet channel.Connector, context input.Context) (inp }, } } - _ = forwarder.Send(evt) + + publisher.Publish(evt) // This must be called after forwarder.Send to measure // the processing time metric. metrics.log(data, evt.Timestamp) - } - - udp := udp.New(&config.Config, callback) - - return &Input{ - outlet: out, - udp: udp, - started: false, - metrics: metrics, - }, nil -} - -func configID(config *conf.C) (string, error) { - var tmp struct { - ID string `config:"id"` - } - if err := config.Unpack(&tmp); err != nil { - return "", fmt.Errorf("error extracting ID: %w", err) - } - if tmp.ID != "" { - return tmp.ID, nil - } - - var h map[string]interface{} - _ = config.Unpack(&h) - id, err := hashstructure.Hash(h, nil) - if err != nil { - return "", fmt.Errorf("can not compute ID from configuration: %w", err) - } + }) - return fmt.Sprintf("%16X", id), nil -} + log.Debug("udp input initialized") -// Run starts and start the UDP server and read events from the socket -func (p *Input) Run() { - p.Lock() - defer p.Unlock() - - if !p.started { - logp.Info("Starting UDP input") - err := p.udp.Start() - if err != nil { - logp.Err("Error running harvester: %v", err) - } - p.started = true + err := server.Run(ctxtool.FromCanceller(ctx.Cancelation)) + // Ignore error from 'Run' in case shutdown was signaled. + if ctxerr := ctx.Cancelation.Err(); ctxerr != nil { + err = ctxerr } -} - -// Stop stops the UDP input -func (p *Input) Stop() { - defer p.outlet.Close() - p.Lock() - defer p.Unlock() - - logp.Info("Stopping UDP input") - p.udp.Stop() - p.metrics.close() - p.started = false -} - -// Wait suspends the UDP input -func (p *Input) Wait() { - p.Stop() + return err } // inputMetrics handles the input's metric reporting. diff --git a/filebeat/input/udp/input_test.go b/filebeat/input/udp/input_test.go deleted file mode 100644 index 55157d3a9e1e..000000000000 --- a/filebeat/input/udp/input_test.go +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//go:build !integration -// +build !integration - -package udp - -import ( - "testing" - - "github.com/elastic/beats/v7/filebeat/input/inputtest" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -func TestNewInputDone(t *testing.T) { - config := mapstr.M{ - "hosts": ":0", - } - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) -}