Skip to content

Commit

Permalink
[Auditbeat] fim(kprobes): enrich file events by coupling add_process_…
Browse files Browse the repository at this point in the history
…metadata processor (#38776)

* feat(processors/process_metadata): support reporting group id and name

* feat(processors/process_metadata): support reporting process entity_id

* feat(fim/kprobes): allow metricsSets to expose beat processors after initialisation

* doc: update CHANGELOG.next.asciidoc

* fix(linter): SA1015 prevent leaking the ticker

* fix(linter): SA1019 mark metricbeat/mb deprecation warnings that are not removed yet

* fix(linter): check for return err

* fix(linter): prealloc slices

* fix(linter): remove unused field

* fix(linter): G601 prevent implicit memory aliasing in for loop

* doc: update CHANGELOG.next.asciidoc

* fix: update filebaet fields.asciidoc (unrelated to this work)

* doc: remove irrelevant changes from CHANGELOG.next.asciidoc

* feat(processor/metadata): introduce new type based allocation func

* feat(fim/kprobe): instantiate new processor alongside a new kprobes event reader

* fix(fim): remove redundant whitespace

* doc(metricbeat): enrich documentation about Processors attached to a Metricbeat

* fix(fim): gofumpt eventreader_kprobes.go

* fix(add_process_metadata): gofmt add_process_metadata.go gosysinfo_provider.go

* fix(lint): goimports eventreader_kprobes.go

* fix(winlogbeat): generate include list [unrelated to this PR]

(cherry picked from commit ca4adce)

# Conflicts:
#	libbeat/processors/add_process_metadata/add_process_metadata.go
#	libbeat/processors/add_process_metadata/config.go
#	libbeat/processors/add_process_metadata/gosysinfo_provider.go
#	metricbeat/mb/module/configuration.go
  • Loading branch information
pkoutsovasilis authored and mergify[bot] committed Apr 13, 2024
1 parent edcdae8 commit ea54731
Show file tree
Hide file tree
Showing 19 changed files with 312 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223]
- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199]
- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]

*Filebeat*

Expand Down
37 changes: 35 additions & 2 deletions auditbeat/module/file_integrity/eventreader_kprobes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"time"

"github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"

"golang.org/x/sys/unix"

"github.com/elastic/elastic-agent-libs/logp"
)

type kProbesReader struct {
Expand All @@ -39,6 +41,30 @@ type kProbesReader struct {
log *logp.Logger

parsers []FileParser

processor beat.Processor
}

func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) {
processor, err := add_process_metadata.NewWithConfig(
add_process_metadata.ConfigOverwriteKeys(true),
add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}),
)
if err != nil {
return nil, err
}

return &kProbesReader{
config: config,
eventC: make(chan Event),
log: l,
parsers: parsers,
processor: processor,
}, nil
}

func (r kProbesReader) Processor() beat.Processor {
return r.processor
}

func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) {
Expand Down Expand Up @@ -152,6 +178,13 @@ func (r kProbesReader) nextEvent(done <-chan struct{}) *Event {
start := time.Now()
e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes,
r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers)

if e.Process == nil {
e.Process = &Process{}
}

e.Process.PID = event.PID

e.rtt = time.Since(start)

return &e
Expand Down
6 changes: 1 addition & 5 deletions auditbeat/module/file_integrity/eventreader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ func NewEventReader(c Config, logger *logp.Logger) (EventProducer, error) {
if c.Backend == BackendKprobes {
l := logger.Named("kprobes")
l.Info("selected backend: kprobes")
return &kProbesReader{
config: c,
log: l,
parsers: FileParsers(c),
}, nil
return newKProbesReader(c, l, FileParsers(c))
}

// unimplemented
Expand Down
20 changes: 20 additions & 0 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
bolt "go.etcd.io/bbolt"

"github.com/elastic/beats/v7/auditbeat/datastore"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -62,6 +63,11 @@ type EventProducer interface {
Start(done <-chan struct{}) (<-chan Event, error)
}

// eventProducerWithProcessor is an EventProducer that requires a Processor
type eventProducerWithProcessor interface {
Processor() beat.Processor
}

// MetricSet for monitoring file integrity.
type MetricSet struct {
mb.BaseMetricSet
Expand All @@ -78,6 +84,9 @@ type MetricSet struct {

// Used when a hash can't be calculated
nullHashes map[HashType]Digest

// Processors
processors []beat.Processor
}

// New returns a new file.MetricSet.
Expand Down Expand Up @@ -105,6 +114,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logger,
}

// reader supports a processor
if rWithProcessor, ok := r.(eventProducerWithProcessor); ok {
if proc := rWithProcessor.Processor(); proc != nil {
ms.processors = append(ms.processors, proc)
}
}

ms.nullHashes = make(map[HashType]Digest, len(config.HashTypes))
for _, hashType := range ms.config.HashTypes {
// One byte is enough so that the hashes are persisted to the datastore.
Expand All @@ -117,6 +133,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return ms, nil
}

func (ms *MetricSet) Processors() []beat.Processor {
return ms.processors
}

// Run runs the MetricSet. The method will not return control to the caller
// until it is finished (to stop it close the reporter.Done() channel).
func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
Expand Down
69 changes: 60 additions & 9 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo"
)

const (
Expand Down Expand Up @@ -65,16 +66,24 @@ type addProcessMetadata struct {
cidProvider cidProvider
log *logp.Logger
mappings mapstr.M
uniqueID []byte
}

type processMetadata struct {
entityID string
name, title, exe, username, userid string
args []string
env map[string]string
startTime time.Time
pid, ppid int
<<<<<<< HEAD

Check failure on line 79 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected '}', found '<<' (typecheck)
//
fields mapstr.M
=======
groupname, groupid string
capEffective, capPermitted []string
fields mapstr.M
>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776))

Check failure on line 86 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
}

type processMetadataProvider interface {
Expand All @@ -92,33 +101,48 @@ func init() {

// New constructs a new add_process_metadata processor.
func New(cfg *conf.C) (beat.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache, false)
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, false)
}

// NewWithCache construct a new add_process_metadata processor with cache for container IDs.
// Resulting processor implements `Close()` to release the cache resources.
func NewWithCache(cfg *conf.C) (beat.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, true)
}

func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) {
cfg := defaultConfig()

for _, o := range opts {
o(&cfg)
}

return newProcessMetadataProcessorWithProvider(cfg, &procCache, true)
}

func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
// Logging (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
log = logp.NewLogger(processorName).With("instance_id", id)
)

config := defaultConfig()
if err = cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

// If neither option is configured, then add a default. A default cgroup_regex
// cannot be added to the struct returned by defaultConfig() because if
// config_regex is set, it would take precedence over any user-configured
// cgroup_prefixes.
hasCgroupPrefixes, _ := cfg.Has("cgroup_prefixes", -1)
hasCgroupRegex, _ := cfg.Has("cgroup_regex", -1)
hasCgroupPrefixes := len(config.CgroupPrefixes) > 0
hasCgroupRegex := config.CgroupRegex != nil
if !hasCgroupPrefixes && !hasCgroupRegex {
config.CgroupRegex = defaultCgroupRegex
}
Expand All @@ -134,6 +158,13 @@ func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetada
log: log,
mappings: mappings,
}

if host, _ := sysinfo.Host(); host != nil {
if uniqueID := host.Info().UniqueID; uniqueID != "" {
p.uniqueID = []byte(uniqueID)
}
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand Down Expand Up @@ -311,6 +342,7 @@ func (p *addProcessMetadata) String() string {

func (p *processMetadata) toMap() mapstr.M {
process := mapstr.M{
"entity_id": p.entityID,
"name": p.name,
"title": p.title,
"executable": p.exe,
Expand All @@ -332,6 +364,25 @@ func (p *processMetadata) toMap() mapstr.M {
}
process["owner"] = user
}
<<<<<<< HEAD

Check failure on line 367 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
=======
if len(p.capEffective) > 0 {
process.Put("thread.capabilities.effective", p.capEffective)
}
if len(p.capPermitted) > 0 {
process.Put("thread.capabilities.permitted", p.capPermitted)
}
if p.groupname != "" || p.groupid != "" {
group := mapstr.M{}
if p.groupname != "" {
group["name"] = p.groupname
}
if p.groupid != "" {
group["id"] = p.groupid
}
process["group"] = group
}
>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776))

Check failure on line 385 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '>>' (typecheck)

return mapstr.M{
"process": process,
Expand Down
Loading

0 comments on commit ea54731

Please sign in to comment.