From 4533e8fdc4e065fdb214a0467f701d4e64545275 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 2 Jul 2024 02:44:29 +0300 Subject: [PATCH 01/13] feat: add netflow status reporting under Agent management --- x-pack/filebeat/input/netflow/input.go | 11 +- .../input/netflow/integration_test.go | 397 ++++++++++++++++++ 2 files changed, 407 insertions(+), 1 deletion(-) create mode 100644 x-pack/filebeat/input/netflow/integration_test.go diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 3cd4198fb43..cfee9e6742f 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" @@ -110,6 +111,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err n.started = true n.mtx.Unlock() + ctx.UpdateStatus(status.Starting, "Starting netflow input") n.logger.Info("Starting netflow input") n.logger.Info("Connecting to beat event publishing") @@ -121,6 +123,7 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err EventListener: nil, }) if err != nil { + ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed connecting to beat event publishing: %v", err)) n.logger.Errorw("Failed connecting to beat event publishing", "error", err) n.stop() return err @@ -142,11 +145,13 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err WithSharedTemplates(n.cfg.ShareTemplates). WithActiveSessionsMetric(flowMetrics.ActiveSessions())) if err != nil { + ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed to initialize netflow decoder: %v", err)) return fmt.Errorf("error initializing netflow decoder: %w", err) } n.logger.Info("Starting netflow decoder") if err := n.decoder.Start(); err != nil { + ctx.UpdateStatus(status.Failed, fmt.Sprintf("Failed to start netflow decoder: %v", err)) n.logger.Errorw("Failed to start netflow decoder", "error", err) n.stop() return err @@ -167,7 +172,9 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err }) err = udpServer.Start() if err != nil { - n.logger.Errorf("Failed to start udp server: %v", err) + errorMsg := fmt.Sprintf("Failed to start udp server: %v", err) + n.logger.Errorf(errorMsg) + ctx.UpdateStatus(status.Failed, errorMsg) n.stop() return err } @@ -178,6 +185,8 @@ func (n *netflowInput) Run(ctx v2.Context, connector beat.PipelineConnector) err n.stop() }() + ctx.UpdateStatus(status.Running, "") + for packet := range n.queueC { flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source) if err != nil { diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go new file mode 100644 index 00000000000..eb5edbd742e --- /dev/null +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -0,0 +1,397 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package netflow_test + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "reflect" + "testing" + "time" + + "golang.org/x/time/rate" + + "github.com/elastic/beats/v7/libbeat/tests/integration" + filebeat "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/monitoring" + + "github.com/google/gopacket" + "github.com/google/gopacket/pcap" + "github.com/stretchr/testify/require" +) + +func TestNetFlowIntegration(t *testing.T) { + + // make sure there is an ES instance running + integration.EnsureESIsRunning(t) + esConnectionDetails := integration.GetESURL(t, "http") + outputHost := fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port()) + outputHosts := []interface{}{outputHost} + outputUsername := esConnectionDetails.User.Username() + outputPassword, _ := esConnectionDetails.User.Password() + outputProtocol := esConnectionDetails.Scheme + + deleted, err := DeleteDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") + require.NoError(t, err) + require.True(t, deleted) + + // construct expected Agent units + allStreams := []*proto.UnitExpected{ + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "type": "elasticsearch", + "hosts": outputHosts, + "username": outputUsername, + "password": outputPassword, + "protocol": outputProtocol, + "enabled": true, + "ssl.verification_mode": "none", + // ref: https://www.elastic.co/guide/en/fleet/8.14/es-output-settings.html + "preset": "custom", + "bulk_max_size": 1600, + "worker": 4, + "queue.mem.events": 12800, + "queue.mem.flush.min_events": 1600, + "queue.mem.flush.timeout": 5, + "compression_level": 1, + "connection_idle_timeout": 15, + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 0, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_INFO, + Config: &proto.UnitExpectedConfig{ + Id: "netflow-netflow-1e8b33de-d54a-45cd-90da-23ed71c482e5", + Type: "netflow", + Name: "netflow-1", + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "use_output": "default", + "revision": 0, + }), + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "netflow", + Version: "1.9.0", + }, + }, + Streams: []*proto.Stream{ + { + Id: "netflow-netflow.netflow-1e8b33de-d54a-45cd-90da-23ed71c482e2", + DataStream: &proto.DataStream{ + Dataset: "netflow.log", + }, + Source: integration.RequireNewStruct(t, map[string]interface{}{ + "id": "netflow_integration_test", + "host": "localhost:6006", + "expiration_timeout": "30m", + "queue_size": 2 * 4 * 1600, + "detect_sequence_reset": true, + "max_message_size": "10KiB", + }), + }, + }, + }, + }, + } + + healthyChan := make(chan struct{}) + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + + if healthyChan != nil { + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState == proto.State_HEALTHY { + if payload.streamStatusEquals("netflow-netflow.netflow-1e8b33de-d54a-45cd-90da-23ed71c482e2", map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }) { + close(healthyChan) + healthyChan = nil + } + } + } + + return &proto.CheckinExpected{ + Units: allStreams, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + if err := server.Start(); err != nil { + t.Fatalf("failed to start StubServerV2 server: %v", err) + } + defer server.Stop() + + // It's necessary to change os.Args so filebeat.Filebeat() can read the + // appropriate args at beat.Execute(). + initialOSArgs := os.Args + os.Args = []string{ + "filebeat", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "management.restart_on_output_change=true", + "-E", "logging.level=info", + } + defer func() { + os.Args = initialOSArgs + }() + + beatCmd := filebeat.Filebeat() + beatRunErr := make(chan error) + go func() { + defer close(beatRunErr) + beatRunErr <- beatCmd.Execute() + }() + + select { + case <-healthyChan: + break + case err := <-beatRunErr: + t.Fatalf("beat run err: %v", err) + case <-time.After(10 * time.Second): + t.Fatalf("timed out waiting for beat to become healthy") + } + + registry := monitoring.GetNamespace("dataset").GetRegistry().GetRegistry("netflow_integration_test") + + discardedEventsTotalVar, ok := registry.Get("discarded_events_total").(*monitoring.Uint) + require.True(t, ok) + + receivedEventTotalVar, ok := registry.Get("received_events_total").(*monitoring.Uint) + require.True(t, ok) + + udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6006") + require.NoError(t, err) + + conn, err := net.DialUDP("udp", nil, udpAddr) + require.NoError(t, err) + + f, err := pcap.OpenOffline("testdata/pcap/ipfix_cisco.pcap") + require.NoError(t, err) + defer f.Close() + + var totalBytes, totalPackets int + limiter := rate.NewLimiter(rate.Limit(10000), 1) + + packetSource := gopacket.NewPacketSource(f, f.LinkType()) + for pkt := range packetSource.Packets() { + for !limiter.Allow() { + } + + payloadData := pkt.TransportLayer().LayerPayload() + + n, err := conn.Write(payloadData) + require.NoError(t, err) + + totalBytes += n + totalPackets++ + } + + require.Zero(t, discardedEventsTotalVar.Get()) + + require.Eventually(t, func() bool { + return receivedEventTotalVar.Get() == uint64(totalPackets) + }, 10*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + return HasDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil + }, 10*time.Second, 200*time.Millisecond) + + require.Eventually(t, func() bool { + eventsCount, err := DataStreamEventsCount(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") + require.NoError(t, err) + return eventsCount >= totalPackets + }, 10*time.Second, 200*time.Millisecond) +} + +type unitPayload map[string]interface{} + +func (u unitPayload) streamStatusEquals(streamID string, expected map[string]interface{}) bool { + if u == nil { + return false + } + + streams, ok := u["streams"].(map[string]interface{}) + if !ok || streams == nil { + return false + } + + streamMap, ok := streams[streamID].(map[string]interface{}) + if !ok || streamMap == nil { + return false + } + + return reflect.DeepEqual(streamMap, expected) +} + +func extractStateAndPayload(observed *proto.CheckinObserved, inputID string) (proto.State, unitPayload) { + for _, unit := range observed.GetUnits() { + if unit.Id == inputID { + return unit.GetState(), unit.Payload.AsMap() + } + } + + return -1, nil +} + +type DataStream struct { + Name string `json:"name"` + Status string `json:"status"` +} + +type DataStreamResult struct { + DataStreams []DataStream `json:"data_streams"` + Error interface{} `json:"error"` +} + +func HasDataStream(username string, password string, url string, name string) error { + resultBytes, err := request(http.MethodGet, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) + if err != nil { + return err + } + + if resultBytes == nil { + return errors.New("http not found error") + } + + var results DataStreamResult + err = json.Unmarshal(resultBytes, &results) + if err != nil { + return err + } + + if results.Error != nil { + return fmt.Errorf("error %v while checking for data stream %s", results.Error, name) + } + + if len(results.DataStreams) != 1 { + return fmt.Errorf( + "unexpected count %v of data streams returned when looking for %s", + len(results.DataStreams), name) + } + + if results.DataStreams[0].Name != name { + return fmt.Errorf("unexpected data stream %s returned when looking for %s", + results.DataStreams[0].Name, + name) + } + + return nil +} + +// Hit represents a single search hit. +type Hit struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + Score float64 `json:"_score"` + Source map[string]interface{} `json:"_source"` +} + +// Hits are the collections of search hits. +type Hits struct { + Total json.RawMessage // model when needed + Hits []Hit `json:"hits"` +} + +// SearchResults are the results returned from a _search. +type SearchResults struct { + Took int + Hits Hits `json:"hits"` + Shards json.RawMessage // model when needed + Aggs map[string]json.RawMessage // model when needed +} + +func DataStreamEventsCount(username string, password string, url string, name string) (int, error) { + resultBytes, err := request(http.MethodGet, username, password, fmt.Sprintf("%s/%s/_search?q=!error.message:*", url, name)) + if err != nil { + return 0, err + } + + if resultBytes == nil { + return 0, errors.New("http not found error") + } + + var results SearchResults + err = json.Unmarshal(resultBytes, &results) + if err != nil { + return 0, err + } + return len(results.Hits.Hits), nil +} + +// DeleteResults are the results returned from a _data_stream delete. +type DeleteResults struct { + Acknowledged bool `json:"acknowledged"` +} + +func DeleteDataStream(username string, password string, url string, name string) (bool, error) { + resultBytes, err := request(http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) + if err != nil { + return false, err + } + + if resultBytes == nil { + return true, nil + } + + var results DeleteResults + err = json.Unmarshal(resultBytes, &results) + if err != nil { + return false, err + } + + return results.Acknowledged, nil +} + +func request(httpMethod string, username string, password string, url string) ([]byte, error) { + req, err := http.NewRequest(httpMethod, url, nil) + if err != nil { + return nil, err + } + req.SetBasicAuth(username, password) + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + if res.StatusCode == http.StatusNotFound { + return nil, nil + } + resultBytes, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + return resultBytes, nil +} From 63c57ccd1af7143db5ff68868e02c32fa32c0e62 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 2 Jul 2024 22:29:23 +0300 Subject: [PATCH 02/13] doc: update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4630859a482..0803c43346f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -295,6 +295,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] - Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054] +- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080] *Auditbeat* From 468da32a3b784dbf428895a1f4f4990df1640307 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 2 Jul 2024 23:33:02 +0300 Subject: [PATCH 03/13] fix: ignore acknowledgment on delete stream --- .../filebeat/input/netflow/integration_test.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index eb5edbd742e..c39b312454b 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -7,6 +7,7 @@ package netflow_test import ( + "context" "encoding/json" "errors" "fmt" @@ -355,26 +356,16 @@ type DeleteResults struct { } func DeleteDataStream(username string, password string, url string, name string) (bool, error) { - resultBytes, err := request(http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) + _, err := request(http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) if err != nil { return false, err } - if resultBytes == nil { - return true, nil - } - - var results DeleteResults - err = json.Unmarshal(resultBytes, &results) - if err != nil { - return false, err - } - - return results.Acknowledged, nil + return true, nil } func request(httpMethod string, username string, password string, url string) ([]byte, error) { - req, err := http.NewRequest(httpMethod, url, nil) + req, err := http.NewRequestWithContext(context.TODO(), httpMethod, url, nil) if err != nil { return nil, err } From 6bd36e9b8b474f64b10f2ffed11e3fc5d134d1c8 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Jul 2024 01:27:24 +0300 Subject: [PATCH 04/13] fix: use ES_SUPERUSER_USER and ES_SUPERUSER_PASS to support data_stream access through http --- x-pack/filebeat/input/netflow/integration_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index c39b312454b..9ee51c15a33 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -39,8 +39,12 @@ func TestNetFlowIntegration(t *testing.T) { esConnectionDetails := integration.GetESURL(t, "http") outputHost := fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port()) outputHosts := []interface{}{outputHost} - outputUsername := esConnectionDetails.User.Username() - outputPassword, _ := esConnectionDetails.User.Password() + + // we are going to need admin access to query ES about the logs-netflow.log-default data_stream + outputUsername := os.Getenv("ES_SUPERUSER_USER") + require.NotEmpty(t, outputUsername) + outputPassword := os.Getenv("ES_SUPERUSER_PASS") + require.NotEmpty(t, outputPassword) outputProtocol := esConnectionDetails.Scheme deleted, err := DeleteDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") From 205da51b7c009aa4575cb693b750b162fe65ca46 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Jul 2024 01:28:50 +0300 Subject: [PATCH 05/13] fix: avoid race condition of nil channel --- .../input/netflow/integration_test.go | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index 9ee51c15a33..699944303ee 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -16,6 +16,7 @@ import ( "net/http" "os" "reflect" + "sync" "testing" "time" @@ -127,19 +128,16 @@ func TestNetFlowIntegration(t *testing.T) { } healthyChan := make(chan struct{}) + closeOnce := sync.Once{} server := &mock.StubServerV2{ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - - if healthyChan != nil { - unitState, payload := extractStateAndPayload(observed, "input-unit-1") - if unitState == proto.State_HEALTHY { - if payload.streamStatusEquals("netflow-netflow.netflow-1e8b33de-d54a-45cd-90da-23ed71c482e2", map[string]interface{}{ - "status": "HEALTHY", - "error": "", - }) { - close(healthyChan) - healthyChan = nil - } + unitState, payload := extractStateAndPayload(observed, "input-unit-1") + if unitState == proto.State_HEALTHY { + if payload.streamStatusEquals("netflow-netflow.netflow-1e8b33de-d54a-45cd-90da-23ed71c482e2", map[string]interface{}{ + "status": "HEALTHY", + "error": "", + }) { + closeOnce.Do(func() { close(healthyChan) }) } } @@ -179,7 +177,6 @@ func TestNetFlowIntegration(t *testing.T) { select { case <-healthyChan: - break case err := <-beatRunErr: t.Fatalf("beat run err: %v", err) case <-time.After(10 * time.Second): From e3ade535de3155c3f221f88b593c37cd75c66369 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Jul 2024 01:29:26 +0300 Subject: [PATCH 06/13] fix: replace Allow with Wait for rate limit --- .../input/netflow/integration_test.go | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index 699944303ee..c319b94a150 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -35,6 +35,8 @@ import ( func TestNetFlowIntegration(t *testing.T) { + ctx := context.Background() + // make sure there is an ES instance running integration.EnsureESIsRunning(t) esConnectionDetails := integration.GetESURL(t, "http") @@ -48,7 +50,7 @@ func TestNetFlowIntegration(t *testing.T) { require.NotEmpty(t, outputPassword) outputProtocol := esConnectionDetails.Scheme - deleted, err := DeleteDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") + deleted, err := DeleteDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") require.NoError(t, err) require.True(t, deleted) @@ -206,8 +208,8 @@ func TestNetFlowIntegration(t *testing.T) { packetSource := gopacket.NewPacketSource(f, f.LinkType()) for pkt := range packetSource.Packets() { - for !limiter.Allow() { - } + err = limiter.Wait(ctx) + require.NoError(t, err) payloadData := pkt.TransportLayer().LayerPayload() @@ -225,11 +227,11 @@ func TestNetFlowIntegration(t *testing.T) { }, 10*time.Second, 200*time.Millisecond) require.Eventually(t, func() bool { - return HasDataStream(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil + return HasDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil }, 10*time.Second, 200*time.Millisecond) require.Eventually(t, func() bool { - eventsCount, err := DataStreamEventsCount(outputUsername, outputPassword, outputHost, "logs-netflow.log-default") + eventsCount, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") require.NoError(t, err) return eventsCount >= totalPackets }, 10*time.Second, 200*time.Millisecond) @@ -275,8 +277,8 @@ type DataStreamResult struct { Error interface{} `json:"error"` } -func HasDataStream(username string, password string, url string, name string) error { - resultBytes, err := request(http.MethodGet, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) +func HasDataStream(ctx context.Context, username string, password string, url string, name string) error { + resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) if err != nil { return err } @@ -333,8 +335,8 @@ type SearchResults struct { Aggs map[string]json.RawMessage // model when needed } -func DataStreamEventsCount(username string, password string, url string, name string) (int, error) { - resultBytes, err := request(http.MethodGet, username, password, fmt.Sprintf("%s/%s/_search?q=!error.message:*", url, name)) +func DataStreamEventsCount(ctx context.Context, username string, password string, url string, name string) (int, error) { + resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/%s/_search?q=!error.message:*", url, name)) if err != nil { return 0, err } @@ -356,8 +358,8 @@ type DeleteResults struct { Acknowledged bool `json:"acknowledged"` } -func DeleteDataStream(username string, password string, url string, name string) (bool, error) { - _, err := request(http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) +func DeleteDataStream(ctx context.Context, username string, password string, url string, name string) (bool, error) { + _, err := request(ctx, http.MethodDelete, username, password, fmt.Sprintf("%s/_data_stream/%s", url, name)) if err != nil { return false, err } @@ -365,8 +367,8 @@ func DeleteDataStream(username string, password string, url string, name string) return true, nil } -func request(httpMethod string, username string, password string, url string) ([]byte, error) { - req, err := http.NewRequestWithContext(context.TODO(), httpMethod, url, nil) +func request(ctx context.Context, httpMethod string, username string, password string, url string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, httpMethod, url, nil) if err != nil { return nil, err } From fa7ad071d4a084d0d23f0fd3df96a6af3e6678c4 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 3 Jul 2024 01:47:52 +0300 Subject: [PATCH 07/13] fix: specify time threshold in constants --- x-pack/filebeat/input/netflow/integration_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index c319b94a150..5c730ab3a26 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -33,6 +33,11 @@ import ( "github.com/stretchr/testify/require" ) +const ( + waitFor = 10 * time.Second + tick = 200 * time.Millisecond +) + func TestNetFlowIntegration(t *testing.T) { ctx := context.Background() @@ -181,7 +186,7 @@ func TestNetFlowIntegration(t *testing.T) { case <-healthyChan: case err := <-beatRunErr: t.Fatalf("beat run err: %v", err) - case <-time.After(10 * time.Second): + case <-time.After(waitFor): t.Fatalf("timed out waiting for beat to become healthy") } @@ -224,17 +229,17 @@ func TestNetFlowIntegration(t *testing.T) { require.Eventually(t, func() bool { return receivedEventTotalVar.Get() == uint64(totalPackets) - }, 10*time.Second, 200*time.Millisecond) + }, waitFor, tick) require.Eventually(t, func() bool { return HasDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil - }, 10*time.Second, 200*time.Millisecond) + }, waitFor, tick) require.Eventually(t, func() bool { eventsCount, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") require.NoError(t, err) return eventsCount >= totalPackets - }, 10*time.Second, 200*time.Millisecond) + }, waitFor, tick) } type unitPayload map[string]interface{} From c7978ee5d053ca9d05bd4ee71e183dbd9737fafb Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Tue, 2 Jul 2024 22:45:14 +0300 Subject: [PATCH 08/13] fix: exit directly in debugPrintProcessor when log doesn't have debug level enabled --- libbeat/publisher/processing/processors.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index e90202401a7..a3fe42c88da 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -199,6 +199,10 @@ func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { EscapeHTML: false, }) return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { + if !log.IsDebug() { + return event, nil + } + mux.Lock() defer mux.Unlock() From 52bc0f11e5d0eedfa9024db8a14a734da2dc4827 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Jul 2024 20:06:29 +0300 Subject: [PATCH 09/13] fix: rework on ratelimit to batch up requests and make code lighter --- x-pack/filebeat/input/netflow/integration_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index 5c730ab3a26..a4469497c4e 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -209,12 +209,16 @@ func TestNetFlowIntegration(t *testing.T) { defer f.Close() var totalBytes, totalPackets int - limiter := rate.NewLimiter(rate.Limit(10000), 1) + rateLimit := 10000 + limiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit) packetSource := gopacket.NewPacketSource(f, f.LinkType()) for pkt := range packetSource.Packets() { - err = limiter.Wait(ctx) - require.NoError(t, err) + + if totalPackets%rateLimit == 0 { + err = limiter.WaitN(ctx, rateLimit) + require.NoError(t, err) + } payloadData := pkt.TransportLayer().LayerPayload() From 2a4a118d84c59d7a24d1239613ac59b8b364da3f Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Jul 2024 20:30:51 +0300 Subject: [PATCH 10/13] feat: check also for malformed events --- x-pack/filebeat/input/netflow/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index a4469497c4e..f928c9b0cb6 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -344,8 +344,8 @@ type SearchResults struct { Aggs map[string]json.RawMessage // model when needed } -func DataStreamEventsCount(ctx context.Context, username string, password string, url string, name string) (int, error) { - resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/%s/_search?q=!error.message:*", url, name)) +func DataStreamEventsCount(ctx context.Context, username string, password string, url string, name string) (uint64, error) { + resultBytes, err := request(ctx, http.MethodGet, username, password, fmt.Sprintf("%s/%s/_search?q=!_ignored:*+AND+!event.message:*", url, name)) if err != nil { return 0, err } From 4227229ec0a95db431d82464c35588197735c82b Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Jul 2024 20:31:08 +0300 Subject: [PATCH 11/13] feat: add deterministic check for expected events number --- .../input/netflow/integration_test.go | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index f928c9b0cb6..f15272306c1 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -22,6 +22,7 @@ import ( "golang.org/x/time/rate" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/tests/integration" filebeat "github.com/elastic/beats/v7/x-pack/filebeat/cmd" "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" @@ -204,6 +205,14 @@ func TestNetFlowIntegration(t *testing.T) { conn, err := net.DialUDP("udp", nil, udpAddr) require.NoError(t, err) + data, err := os.ReadFile("testdata/golden/ipfix_cisco.pcap.golden.json") + require.NoError(t, err) + + var expectedFlows struct { + Flows []beat.Event `json:"events,omitempty"` + } + err = json.Unmarshal(data, &expectedFlows) + f, err := pcap.OpenOffline("testdata/pcap/ipfix_cisco.pcap") require.NoError(t, err) defer f.Close() @@ -242,7 +251,7 @@ func TestNetFlowIntegration(t *testing.T) { require.Eventually(t, func() bool { eventsCount, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") require.NoError(t, err) - return eventsCount >= totalPackets + return eventsCount == uint64(len(expectedFlows.Flows)) }, waitFor, tick) } @@ -330,10 +339,14 @@ type Hit struct { Source map[string]interface{} `json:"_source"` } +type Total struct { + Value uint64 `json:"value"` +} + // Hits are the collections of search hits. type Hits struct { - Total json.RawMessage // model when needed - Hits []Hit `json:"hits"` + Total Total // model when needed + Hits []Hit `json:"hits"` } // SearchResults are the results returned from a _search. @@ -359,7 +372,7 @@ func DataStreamEventsCount(ctx context.Context, username string, password string if err != nil { return 0, err } - return len(results.Hits.Hits), nil + return results.Hits.Total.Value, nil } // DeleteResults are the results returned from a _data_stream delete. From 861a88b5ba6f2ce077b44cc9a054120e2f0c6764 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Jul 2024 20:33:06 +0300 Subject: [PATCH 12/13] Revert "fix: exit directly in debugPrintProcessor when log doesn't have debug level enabled" This reverts commit c7978ee5d053ca9d05bd4ee71e183dbd9737fafb. --- libbeat/publisher/processing/processors.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index a3fe42c88da..e90202401a7 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -199,10 +199,6 @@ func debugPrintProcessor(info beat.Info, log *logp.Logger) *processorFn { EscapeHTML: false, }) return newProcessor("debugPrint", func(event *beat.Event) (*beat.Event, error) { - if !log.IsDebug() { - return event, nil - } - mux.Lock() defer mux.Unlock() From 2936c8e57f89370901ed653deff695585c189986 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Thu, 4 Jul 2024 23:28:52 +0300 Subject: [PATCH 13/13] fix: check err from json unmarshal of expectedFlows --- x-pack/filebeat/input/netflow/integration_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/filebeat/input/netflow/integration_test.go b/x-pack/filebeat/input/netflow/integration_test.go index f15272306c1..836336d8249 100644 --- a/x-pack/filebeat/input/netflow/integration_test.go +++ b/x-pack/filebeat/input/netflow/integration_test.go @@ -212,6 +212,7 @@ func TestNetFlowIntegration(t *testing.T) { Flows []beat.Event `json:"events,omitempty"` } err = json.Unmarshal(data, &expectedFlows) + require.NoError(t, err) f, err := pcap.OpenOffline("testdata/pcap/ipfix_cisco.pcap") require.NoError(t, err)