diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 070fb0d8980..c1418d194b3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add opt-in eBPF backend for file_integrity module. {pull}37223[37223] - Add process data to file events (Linux only, eBPF backend). {pull}38199[38199] - Add container id to file events (Linux only, eBPF backend). {pull}38328[38328] +- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776] *Filebeat* diff --git a/auditbeat/module/file_integrity/eventreader_kprobes.go b/auditbeat/module/file_integrity/eventreader_kprobes.go index 7cddd7f60cd..e5cdd76f4b7 100644 --- a/auditbeat/module/file_integrity/eventreader_kprobes.go +++ b/auditbeat/module/file_integrity/eventreader_kprobes.go @@ -26,10 +26,12 @@ import ( "time" "github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes" - - "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata" "golang.org/x/sys/unix" + + "github.com/elastic/elastic-agent-libs/logp" ) type kProbesReader struct { @@ -39,6 +41,30 @@ type kProbesReader struct { log *logp.Logger parsers []FileParser + + processor beat.Processor +} + +func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) { + processor, err := add_process_metadata.NewWithConfig( + add_process_metadata.ConfigOverwriteKeys(true), + add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}), + ) + if err != nil { + return nil, err + } + + return &kProbesReader{ + config: config, + eventC: make(chan Event), + log: l, + parsers: parsers, + processor: processor, + }, nil +} + +func (r kProbesReader) Processor() beat.Processor { + return r.processor } func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) { @@ -152,6 +178,13 @@ func (r kProbesReader) nextEvent(done <-chan struct{}) *Event { start := time.Now() e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes, r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers) + + if e.Process == nil { + e.Process = &Process{} + } + + e.Process.PID = event.PID + e.rtt = time.Since(start) return &e diff --git a/auditbeat/module/file_integrity/eventreader_linux.go b/auditbeat/module/file_integrity/eventreader_linux.go index ac9ce7de60d..c6b3d330c77 100644 --- a/auditbeat/module/file_integrity/eventreader_linux.go +++ b/auditbeat/module/file_integrity/eventreader_linux.go @@ -58,11 +58,7 @@ func NewEventReader(c Config, logger *logp.Logger) (EventProducer, error) { if c.Backend == BackendKprobes { l := logger.Named("kprobes") l.Info("selected backend: kprobes") - return &kProbesReader{ - config: c, - log: l, - parsers: FileParsers(c), - }, nil + return newKProbesReader(c, l, FileParsers(c)) } // unimplemented diff --git a/auditbeat/module/file_integrity/metricset.go b/auditbeat/module/file_integrity/metricset.go index eeaaa67b365..dc4b7c9169c 100644 --- a/auditbeat/module/file_integrity/metricset.go +++ b/auditbeat/module/file_integrity/metricset.go @@ -27,6 +27,7 @@ import ( bolt "go.etcd.io/bbolt" "github.com/elastic/beats/v7/auditbeat/datastore" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" "github.com/elastic/elastic-agent-libs/logp" @@ -62,6 +63,11 @@ type EventProducer interface { Start(done <-chan struct{}) (<-chan Event, error) } +// eventProducerWithProcessor is an EventProducer that requires a Processor +type eventProducerWithProcessor interface { + Processor() beat.Processor +} + // MetricSet for monitoring file integrity. type MetricSet struct { mb.BaseMetricSet @@ -78,6 +84,9 @@ type MetricSet struct { // Used when a hash can't be calculated nullHashes map[HashType]Digest + + // Processors + processors []beat.Processor } // New returns a new file.MetricSet. @@ -105,6 +114,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { log: logger, } + // reader supports a processor + if rWithProcessor, ok := r.(eventProducerWithProcessor); ok { + if proc := rWithProcessor.Processor(); proc != nil { + ms.processors = append(ms.processors, proc) + } + } + ms.nullHashes = make(map[HashType]Digest, len(config.HashTypes)) for _, hashType := range ms.config.HashTypes { // One byte is enough so that the hashes are persisted to the datastore. @@ -117,6 +133,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return ms, nil } +func (ms *MetricSet) Processors() []beat.Processor { + return ms.processors +} + // Run runs the MetricSet. The method will not return control to the caller // until it is finished (to stop it close the reporter.Done() channel). func (ms *MetricSet) Run(reporter mb.PushReporterV2) { diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 60a533a8e77..c1d4b571eee 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup" "github.com/elastic/elastic-agent-system-metrics/metric/system/resolve" + "github.com/elastic/go-sysinfo" ) const ( @@ -65,16 +66,24 @@ type addProcessMetadata struct { cidProvider cidProvider log *logp.Logger mappings mapstr.M + uniqueID []byte } type processMetadata struct { + entityID string name, title, exe, username, userid string args []string env map[string]string startTime time.Time pid, ppid int +<<<<<<< HEAD // fields mapstr.M +======= + groupname, groupid string + capEffective, capPermitted []string + fields mapstr.M +>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776)) } type processMetadataProvider interface { @@ -92,33 +101,48 @@ func init() { // New constructs a new add_process_metadata processor. func New(cfg *conf.C) (beat.Processor, error) { - return newProcessMetadataProcessorWithProvider(cfg, &procCache, false) + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) + } + + return newProcessMetadataProcessorWithProvider(config, &procCache, false) } // NewWithCache construct a new add_process_metadata processor with cache for container IDs. // Resulting processor implements `Close()` to release the cache resources. func NewWithCache(cfg *conf.C) (beat.Processor, error) { + config := defaultConfig() + if err := cfg.Unpack(&config); err != nil { + return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) + } + + return newProcessMetadataProcessorWithProvider(config, &procCache, true) +} + +func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) { + cfg := defaultConfig() + + for _, o := range opts { + o(&cfg) + } + return newProcessMetadataProcessorWithProvider(cfg, &procCache, true) } -func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) { +func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) { // Logging (each processor instance has a unique ID). var ( id = int(instanceID.Inc()) log = logp.NewLogger(processorName).With("instance_id", id) ) - config := defaultConfig() - if err = cfg.Unpack(&config); err != nil { - return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err) - } - // If neither option is configured, then add a default. A default cgroup_regex // cannot be added to the struct returned by defaultConfig() because if // config_regex is set, it would take precedence over any user-configured // cgroup_prefixes. - hasCgroupPrefixes, _ := cfg.Has("cgroup_prefixes", -1) - hasCgroupRegex, _ := cfg.Has("cgroup_regex", -1) + hasCgroupPrefixes := len(config.CgroupPrefixes) > 0 + hasCgroupRegex := config.CgroupRegex != nil if !hasCgroupPrefixes && !hasCgroupRegex { config.CgroupRegex = defaultCgroupRegex } @@ -134,6 +158,13 @@ func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetada log: log, mappings: mappings, } + + if host, _ := sysinfo.Host(); host != nil { + if uniqueID := host.Info().UniqueID; uniqueID != "" { + p.uniqueID = []byte(uniqueID) + } + } + // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled if ok := containsValue(mappings, "container.id"); ok { if withCache && config.CgroupCacheExpireTime != 0 { @@ -311,6 +342,7 @@ func (p *addProcessMetadata) String() string { func (p *processMetadata) toMap() mapstr.M { process := mapstr.M{ + "entity_id": p.entityID, "name": p.name, "title": p.title, "executable": p.exe, @@ -332,6 +364,25 @@ func (p *processMetadata) toMap() mapstr.M { } process["owner"] = user } +<<<<<<< HEAD +======= + if len(p.capEffective) > 0 { + process.Put("thread.capabilities.effective", p.capEffective) + } + if len(p.capPermitted) > 0 { + process.Put("thread.capabilities.permitted", p.capPermitted) + } + if p.groupname != "" || p.groupid != "" { + group := mapstr.M{} + if p.groupname != "" { + group["name"] = p.groupname + } + if p.groupid != "" { + group["id"] = p.groupid + } + process["group"] = group + } +>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776)) return mapstr.M{ "process": process, diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 9dd1a7eb4dd..ef410ea8617 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -43,10 +43,11 @@ func TestAddProcessMetadata(t *testing.T) { startTime := time.Now() testProcs := testProvider{ 1: { - name: "systemd", - title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", - exe: "/usr/lib/systemd/systemd", - args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, + name: "systemd", + entityID: "XCOVE56SVVEOKBNX", + title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", + exe: "/usr/lib/systemd/systemd", + args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, env: map[string]string{ "HOME": "/", "TERM": "linux", @@ -60,10 +61,11 @@ func TestAddProcessMetadata(t *testing.T) { userid: "0", }, 3: { - name: "systemd", - title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", - exe: "/usr/lib/systemd/systemd", - args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, + name: "systemd", + entityID: "XCOVE56SVVEOKBNX", + title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", + exe: "/usr/lib/systemd/systemd", + args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, env: map[string]string{ "HOME": "/", "TERM": "linux", @@ -150,6 +152,7 @@ func TestAddProcessMetadata(t *testing.T) { }, "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -235,6 +238,7 @@ func TestAddProcessMetadata(t *testing.T) { "parent": mapstr.M{ "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -269,6 +273,7 @@ func TestAddProcessMetadata(t *testing.T) { "parent": mapstr.M{ "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -310,6 +315,7 @@ func TestAddProcessMetadata(t *testing.T) { "parent": mapstr.M{ "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -508,6 +514,7 @@ func TestAddProcessMetadata(t *testing.T) { }, "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -633,6 +640,7 @@ func TestAddProcessMetadata(t *testing.T) { }, "process": mapstr.M{ "name": "systemd", + "entity_id": "XCOVE56SVVEOKBNX", "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", "executable": "/usr/lib/systemd/systemd", "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, @@ -723,7 +731,7 @@ func TestAddProcessMetadata(t *testing.T) { config: mapstr.M{ "cgroup_regex": "", }, - initErr: errors.New("fail to unpack the add_process_metadata configuration: cgroup_regexp must contain exactly one capturing group for the container ID accessing config"), + initErr: errors.New("cgroup_regexp must contain exactly one capturing group for the container ID accessing config"), }, { description: "cgroup_prefixes configured", @@ -744,17 +752,23 @@ func TestAddProcessMetadata(t *testing.T) { }, } { t.Run(test.description, func(t *testing.T) { - config, err := conf.NewConfigFrom(test.config) - if err != nil { - t.Fatal(err) + configC, err := conf.NewConfigFrom(test.config) + assert.NoError(t, err) + + config := defaultConfig() + if err := configC.Unpack(&config); err != nil { + if test.initErr == nil { + t.Fatal(err) + } + assert.EqualError(t, err, test.initErr.Error()) + return } proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true) - if test.initErr == nil { - if err != nil { + if err != nil { + if test.initErr == nil { t.Fatal(err) } - } else { assert.EqualError(t, err, test.initErr.Error()) return } @@ -785,7 +799,11 @@ func TestAddProcessMetadata(t *testing.T) { "include_fields": []string{"process.name"}, } - config, err := conf.NewConfigFrom(c) + configC, err := conf.NewConfigFrom(c) + assert.NoError(t, err) + + config := defaultConfig() + err = configC.Unpack(&config) assert.NoError(t, err) proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true) diff --git a/libbeat/processors/add_process_metadata/cache.go b/libbeat/processors/add_process_metadata/cache.go index e3435aa92af..713c2bb730e 100644 --- a/libbeat/processors/add_process_metadata/cache.go +++ b/libbeat/processors/add_process_metadata/cache.go @@ -53,7 +53,7 @@ func (pc *processCache) getEntryUnlocked(pid int) (entry processCacheEntry, vali if entry, valid = pc.cache[pid]; valid { valid = entry.expiration.After(time.Now()) } - return + return entry, valid } func (pc *processCache) GetProcessMetadata(pid int) (*processMetadata, error) { diff --git a/libbeat/processors/add_process_metadata/cache_test.go b/libbeat/processors/add_process_metadata/cache_test.go index 9d9886d9932..882349b6ae6 100644 --- a/libbeat/processors/add_process_metadata/cache_test.go +++ b/libbeat/processors/add_process_metadata/cache_test.go @@ -21,6 +21,8 @@ import ( "math/rand" "testing" "time" + + "github.com/stretchr/testify/require" ) var cacheEvictionTests = []struct { @@ -96,7 +98,8 @@ func TestCacheEviction(t *testing.T) { for i := 0; i < test.iters; i++ { pid := rnd.Intn(test.maxPID) - c.GetProcessMetadata(pid) + _, err := c.GetProcessMetadata(pid) + require.NoError(t, err) if len(c.cache) > test.cap { t.Errorf("cache overflow for %s after %d iterations", test.name, i) break diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index f16ba6771a8..7fd35b392dc 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -80,11 +80,25 @@ var defaultFields = mapstr.M{ "parent": mapstr.M{ "pid": nil, }, + "entity_id": nil, "start_time": nil, "owner": mapstr.M{ "name": nil, "id": nil, }, +<<<<<<< HEAD +======= + "group": mapstr.M{ + "name": nil, + "id": nil, + }, + "thread": mapstr.M{ + "capabilities": mapstr.M{ + "effective": nil, + "permitted": nil, + }, + }, +>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776)) }, "container": mapstr.M{ "id": nil, @@ -114,6 +128,20 @@ func defaultConfig() config { } } +type ConfigOption func(c *config) + +func ConfigOverwriteKeys(overwriteKeys bool) ConfigOption { + return func(c *config) { + c.OverwriteKeys = overwriteKeys + } +} + +func ConfigMatchPIDs(matchPIDs []string) ConfigOption { + return func(c *config) { + c.MatchPIDs = matchPIDs + } +} + func (c *config) getMappings() (mappings mapstr.M, err error) { mappings = mapstr.M{} validFields := defaultFields diff --git a/libbeat/processors/add_process_metadata/gosysinfo_provider.go b/libbeat/processors/add_process_metadata/gosysinfo_provider.go index ecc94233dce..9e2b7d635ed 100644 --- a/libbeat/processors/add_process_metadata/gosysinfo_provider.go +++ b/libbeat/processors/add_process_metadata/gosysinfo_provider.go @@ -18,13 +18,29 @@ package add_process_metadata import ( + "crypto/sha256" + "encoding/base64" + "encoding/binary" "os/user" "strings" + "sync" + "time" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" ) +var hostInfoOnce = sync.OnceValues(func() ([]byte, error) { + host, err := sysinfo.Host() + if err == nil { + if uniqueID := host.Info().UniqueID; uniqueID != "" { + return []byte(uniqueID), err + } + } + + return nil, err +}) + type gosysinfoProvider struct{} func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, err error) { @@ -44,14 +60,20 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, env, _ = e.Environment() } - username, userid := "", "" + username, userid, groupname, groupid := "", "", "", "" if userInfo, err := proc.User(); err == nil { userid = userInfo.UID if u, err := user.LookupId(userInfo.UID); err == nil { username = u.Username } + + groupid = userInfo.GID + if g, err := user.LookupGroupId(userInfo.GID); err == nil { + groupname = g.Name + } } +<<<<<<< HEAD r := processMetadata{ name: info.Name, args: info.Args, @@ -63,7 +85,63 @@ func (p gosysinfoProvider) GetProcessMetadata(pid int) (result *processMetadata, startTime: info.StartTime, username: username, userid: userid, +======= + eID, _ := entityID(pid, info.StartTime) + + // Capabilities are linux only and other systems will fail + // with ErrUnsupported. In the event of any errors, we simply + // don't report the capabilities. + capPermitted, _ := capabilities.FromPid(capabilities.Permitted, pid) + capEffective, _ := capabilities.FromPid(capabilities.Effective, pid) + + r := processMetadata{ + entityID: eID, + name: info.Name, + args: info.Args, + env: env, + title: strings.Join(info.Args, " "), + exe: info.Exe, + pid: info.PID, + ppid: info.PPID, + capEffective: capEffective, + capPermitted: capPermitted, + startTime: info.StartTime, + username: username, + userid: userid, + groupname: groupname, + groupid: groupid, +>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776)) } + r.fields = r.toMap() return &r, nil } + +// entityID creates an ID that uniquely identifies this process across machines. +func entityID(pid int, start time.Time) (string, error) { + uniqueID, err := hostInfoOnce() + if err != nil && len(uniqueID) == 0 { + return "", err + } + + if len(uniqueID) == 0 || start.IsZero() { + return "", nil + } + + h := sha256.New() + if _, err := h.Write(uniqueID); err != nil { + return "", err + } + if err := binary.Write(h, binary.LittleEndian, int64(pid)); err != nil { + return "", err + } + if err := binary.Write(h, binary.LittleEndian, int64(start.Nanosecond())); err != nil { + return "", err + } + + sum := h.Sum(nil) + if len(sum) > 12 { + sum = sum[:12] + } + return base64.RawStdEncoding.EncodeToString(sum), nil +} diff --git a/metricbeat/mb/module/configuration.go b/metricbeat/mb/module/configuration.go index 1e69d6094c4..821dd452b0e 100644 --- a/metricbeat/mb/module/configuration.go +++ b/metricbeat/mb/module/configuration.go @@ -26,8 +26,13 @@ import ( ) // ConfiguredModules returns a list of all configured modules, including anyone present under dynamic config settings. +<<<<<<< HEAD func ConfiguredModules(modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option) ([]*Wrapper, error) { var modules []*Wrapper +======= +func ConfiguredModules(registry *mb.Register, modulesData []*conf.C, configModulesData *conf.C, moduleOptions []Option) ([]*Wrapper, error) { + var modules []*Wrapper //nolint:prealloc //can't be preallocated +>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776)) for _, moduleCfg := range modulesData { module, err := NewWrapper(moduleCfg, mb.Registry, moduleOptions...) @@ -40,7 +45,9 @@ func ConfiguredModules(modulesData []*conf.C, configModulesData *conf.C, moduleO // Add dynamic modules if configModulesData.Enabled() { config := cfgfile.DefaultDynamicConfig - configModulesData.Unpack(&config) + if err := configModulesData.Unpack(&config); err != nil { + return nil, err + } modulesManager, err := cfgfile.NewGlobManager(config.Path, ".yml", ".disabled") if err != nil { diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index 6e6b0ca6113..960280bc747 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -34,7 +34,6 @@ type Connector struct { pipeline beat.PipelineConnector processors *processors.Processors eventMeta mapstr.EventMetadata - timeSeries bool keepNull bool } @@ -97,6 +96,17 @@ func (c *Connector) UseMetricSetProcessors(r metricSetRegister, moduleName, metr return nil } +// addProcessors appends processors to the connector properties. +func (c *Connector) addProcessors(procs []beat.Processor) { + if c.processors == nil { + c.processors = processors.NewList(nil) + } + + for _, p := range procs { + c.processors.AddProcessor(p) + } +} + func (c *Connector) Connect() (beat.Client, error) { return c.pipeline.ConnectWith(beat.ClientConfig{ Processing: beat.ProcessingConfig{ diff --git a/metricbeat/mb/module/connector_test.go b/metricbeat/mb/module/connector_test.go index ed7008889c0..5079fbb23f9 100644 --- a/metricbeat/mb/module/connector_test.go +++ b/metricbeat/mb/module/connector_test.go @@ -67,7 +67,8 @@ func TestProcessorsForConfig(t *testing.T) { t.Errorf("[%s] %v", description, err) continue } - processedEvent, err := processors.Run(&test.event) + testEvent := testCases[description].event + processedEvent, err := processors.Run(&testEvent) // We don't check if err != nil, because we are testing the final outcome // of running the processors, including when some of them fail. if processedEvent == nil { diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index be8999a84d1..dab4bc8816f 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -31,6 +31,14 @@ type Factory struct { options []Option } +// metricSetWithProcessors is an interface to check if a MetricSet has directly attached Processors +// NOTE: Processors that implement the Closer interface are going to be closed from the pipeline when required, +// namely during dynamic configuration reloading. Thus, it is critical for the Metricset to always instantiate +// properly the processor and not consider it as always running. +type metricSetWithProcessors interface { + Processors() []beat.Processor +} + // NewFactory creates new Reloader instance for the given config func NewFactory(beatInfo beat.Info, options ...Option) *Factory { return &Factory{ @@ -46,7 +54,7 @@ func (r *Factory) Create(p beat.PipelineConnector, c *conf.C) (cfgfile.Runner, e return nil, err } - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, len(metricSets)) for _, metricSet := range metricSets { wrapper, err := NewWrapperForMetricSet(module, metricSet, r.options...) if err != nil { @@ -63,6 +71,10 @@ func (r *Factory) Create(p beat.PipelineConnector, c *conf.C) (cfgfile.Runner, e return nil, err } + if msWithProcs, ok := metricSet.(metricSetWithProcessors); ok { + connector.addProcessors(msWithProcs.Processors()) + } + client, err := connector.Connect() if err != nil { return nil, err diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index 542926325f6..e020cd87d55 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -57,7 +57,7 @@ func (rg *runnerGroup) Stop() { } func (rg *runnerGroup) String() string { - var entries []string + entries := make([]string, 0, len(rg.runners)) for _, runner := range rg.runners { entries = append(entries, runner.String()) } diff --git a/metricbeat/mb/module/runner_group_test.go b/metricbeat/mb/module/runner_group_test.go index 036396a3103..1d462359968 100644 --- a/metricbeat/mb/module/runner_group_test.go +++ b/metricbeat/mb/module/runner_group_test.go @@ -79,7 +79,7 @@ func TestStartStop(t *testing.T) { startCounter := atomic.NewInt(0) stopCounter := atomic.NewInt(0) - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, @@ -98,7 +98,7 @@ func TestStartStop(t *testing.T) { } func TestDiagnosticsUnsupported(t *testing.T) { - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, @@ -119,7 +119,7 @@ func TestDiagnosticsUnsupported(t *testing.T) { } func TestDiagosticsSupported(t *testing.T) { - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunnerDiag{ id: i, @@ -134,7 +134,7 @@ func TestDiagosticsSupported(t *testing.T) { } func TestString(t *testing.T) { - var runners []cfgfile.Runner + runners := make([]cfgfile.Runner, 0, fakeRunnersNum) for i := 0; i < fakeRunnersNum; i++ { runners = append(runners, &fakeRunner{ id: i, diff --git a/metricbeat/mb/module/testing.go b/metricbeat/mb/module/testing.go index 6d903fe62e5..779a630dd96 100644 --- a/metricbeat/mb/module/testing.go +++ b/metricbeat/mb/module/testing.go @@ -36,8 +36,11 @@ func receiveOneEvent(d testing.Driver, events <-chan beat.Event, timeout time.Du go func() { defer close(done) + ticker := time.NewTicker(timeout) + defer ticker.Stop() + select { - case <-time.Tick(timeout): + case <-ticker.C: d.Error("error", errors.New("timeout waiting for an event")) case event, ok := <-events: if !ok { diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index 6df0aaa2364..d41bdf01497 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -199,13 +199,13 @@ func (msw *metricSetWrapper) run(done <-chan struct{}, out chan<- beat.Event) { } switch ms := msw.MetricSet.(type) { - case mb.PushMetricSet: + case mb.PushMetricSet: //nolint:staticcheck // PushMetricSet is deprecated but not removed ms.Run(reporter.V1()) case mb.PushMetricSetV2: ms.Run(reporter.V2()) case mb.PushMetricSetV2WithContext: ms.Run(&channelContext{done}, reporter.V2()) - case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: + case mb.ReportingMetricSet, mb.ReportingMetricSetV2, mb.ReportingMetricSetV2Error, mb.ReportingMetricSetV2WithContext: //nolint:staticcheck // ReportingMetricSet is deprecated but not removed msw.startPeriodicFetching(&channelContext{done}, reporter) default: // Earlier startup stages prevent this from happening. @@ -242,7 +242,7 @@ func (msw *metricSetWrapper) startPeriodicFetching(ctx context.Context, reporter // and log a stack track if one occurs. func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { switch fetcher := msw.MetricSet.(type) { - case mb.ReportingMetricSet: + case mb.ReportingMetricSet: //nolint:staticcheck // ReportingMetricSet is deprecated but not removed reporter.StartFetchTimer() fetcher.Fetch(reporter.V1()) case mb.ReportingMetricSetV2: @@ -292,7 +292,7 @@ func (msw *metricSetWrapper) Test(d testing.Driver) { type reporter interface { StartFetchTimer() - V1() mb.PushReporter + V1() mb.PushReporter //nolint:staticcheck // PushReporter is deprecated but not removed V2() mb.PushReporterV2 } @@ -309,7 +309,7 @@ type eventReporter struct { // startFetchTimer demarcates the start of a new fetch. The elapsed time of a // fetch is computed based on the time of this call. func (r *eventReporter) StartFetchTimer() { r.start = time.Now() } -func (r *eventReporter) V1() mb.PushReporter { +func (r *eventReporter) V1() mb.PushReporter { //nolint:staticcheck // PushReporter is deprecated but not removed return reporterV1{v2: r.V2(), module: r.msw.module.Name()} } func (r *eventReporter) V2() mb.PushReporterV2 { return reporterV2{r} } diff --git a/x-pack/winlogbeat/include/list.go b/x-pack/winlogbeat/include/list.go index af2071e249e..6ee9c51eefb 100644 --- a/x-pack/winlogbeat/include/list.go +++ b/x-pack/winlogbeat/include/list.go @@ -7,7 +7,7 @@ package include import ( - // Import packages that need to register themselves. + // Import packages that perform 'func init()'. _ "github.com/elastic/beats/v7/x-pack/winlogbeat/module/powershell" _ "github.com/elastic/beats/v7/x-pack/winlogbeat/module/security" _ "github.com/elastic/beats/v7/x-pack/winlogbeat/module/sysmon"