Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.13](backport #38776) [Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor #38916

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223]
- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199]
- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]

*Filebeat*

Expand Down
37 changes: 35 additions & 2 deletions auditbeat/module/file_integrity/eventreader_kprobes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"time"

"github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"

"golang.org/x/sys/unix"

"github.com/elastic/elastic-agent-libs/logp"
)

type kProbesReader struct {
Expand All @@ -39,6 +41,30 @@ type kProbesReader struct {
log *logp.Logger

parsers []FileParser

processor beat.Processor
}

func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) {
processor, err := add_process_metadata.NewWithConfig(
add_process_metadata.ConfigOverwriteKeys(true),
add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}),
)
if err != nil {
return nil, err
}

return &kProbesReader{
config: config,
eventC: make(chan Event),
log: l,
parsers: parsers,
processor: processor,
}, nil
}

func (r kProbesReader) Processor() beat.Processor {
return r.processor
}

func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) {
Expand Down Expand Up @@ -152,6 +178,13 @@ func (r kProbesReader) nextEvent(done <-chan struct{}) *Event {
start := time.Now()
e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes,
r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers)

if e.Process == nil {
e.Process = &Process{}
}

e.Process.PID = event.PID

e.rtt = time.Since(start)

return &e
Expand Down
6 changes: 1 addition & 5 deletions auditbeat/module/file_integrity/eventreader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ func NewEventReader(c Config, logger *logp.Logger) (EventProducer, error) {
if c.Backend == BackendKprobes {
l := logger.Named("kprobes")
l.Info("selected backend: kprobes")
return &kProbesReader{
config: c,
log: l,
parsers: FileParsers(c),
}, nil
return newKProbesReader(c, l, FileParsers(c))
}

// unimplemented
Expand Down
20 changes: 20 additions & 0 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
bolt "go.etcd.io/bbolt"

"github.com/elastic/beats/v7/auditbeat/datastore"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -62,6 +63,11 @@ type EventProducer interface {
Start(done <-chan struct{}) (<-chan Event, error)
}

// eventProducerWithProcessor is an EventProducer that requires a Processor
type eventProducerWithProcessor interface {
Processor() beat.Processor
}

// MetricSet for monitoring file integrity.
type MetricSet struct {
mb.BaseMetricSet
Expand All @@ -78,6 +84,9 @@ type MetricSet struct {

// Used when a hash can't be calculated
nullHashes map[HashType]Digest

// Processors
processors []beat.Processor
}

// New returns a new file.MetricSet.
Expand Down Expand Up @@ -105,6 +114,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logger,
}

// reader supports a processor
if rWithProcessor, ok := r.(eventProducerWithProcessor); ok {
if proc := rWithProcessor.Processor(); proc != nil {
ms.processors = append(ms.processors, proc)
}
}

ms.nullHashes = make(map[HashType]Digest, len(config.HashTypes))
for _, hashType := range ms.config.HashTypes {
// One byte is enough so that the hashes are persisted to the datastore.
Expand All @@ -117,6 +133,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return ms, nil
}

func (ms *MetricSet) Processors() []beat.Processor {
return ms.processors
}

// Run runs the MetricSet. The method will not return control to the caller
// until it is finished (to stop it close the reporter.Done() channel).
func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
Expand Down
69 changes: 60 additions & 9 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo"
)

const (
Expand Down Expand Up @@ -65,16 +66,24 @@
cidProvider cidProvider
log *logp.Logger
mappings mapstr.M
uniqueID []byte
}

type processMetadata struct {
entityID string
name, title, exe, username, userid string
args []string
env map[string]string
startTime time.Time
pid, ppid int
<<<<<<< HEAD

Check failure on line 79 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected '}', found '<<' (typecheck)
//
fields mapstr.M
=======
groupname, groupid string
capEffective, capPermitted []string
fields mapstr.M
>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776))

Check failure on line 86 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

illegal character U+0023 '#' (typecheck)
}

type processMetadataProvider interface {
Expand All @@ -92,33 +101,48 @@

// New constructs a new add_process_metadata processor.
func New(cfg *conf.C) (beat.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache, false)
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, false)
}

// NewWithCache construct a new add_process_metadata processor with cache for container IDs.
// Resulting processor implements `Close()` to release the cache resources.
func NewWithCache(cfg *conf.C) (beat.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, true)
}

func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) {
cfg := defaultConfig()

for _, o := range opts {
o(&cfg)
}

return newProcessMetadataProcessorWithProvider(cfg, &procCache, true)
}

func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
// Logging (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
log = logp.NewLogger(processorName).With("instance_id", id)
)

config := defaultConfig()
if err = cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

// If neither option is configured, then add a default. A default cgroup_regex
// cannot be added to the struct returned by defaultConfig() because if
// config_regex is set, it would take precedence over any user-configured
// cgroup_prefixes.
hasCgroupPrefixes, _ := cfg.Has("cgroup_prefixes", -1)
hasCgroupRegex, _ := cfg.Has("cgroup_regex", -1)
hasCgroupPrefixes := len(config.CgroupPrefixes) > 0
hasCgroupRegex := config.CgroupRegex != nil
if !hasCgroupPrefixes && !hasCgroupRegex {
config.CgroupRegex = defaultCgroupRegex
}
Expand All @@ -134,6 +158,13 @@
log: log,
mappings: mappings,
}

if host, _ := sysinfo.Host(); host != nil {
if uniqueID := host.Info().UniqueID; uniqueID != "" {
p.uniqueID = []byte(uniqueID)
}
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand Down Expand Up @@ -311,6 +342,7 @@

func (p *processMetadata) toMap() mapstr.M {
process := mapstr.M{
"entity_id": p.entityID,
"name": p.name,
"title": p.title,
"executable": p.exe,
Expand All @@ -332,6 +364,25 @@
}
process["owner"] = user
}
<<<<<<< HEAD

Check failure on line 367 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '<<' (typecheck)
=======
if len(p.capEffective) > 0 {
process.Put("thread.capabilities.effective", p.capEffective)
}
if len(p.capPermitted) > 0 {
process.Put("thread.capabilities.permitted", p.capPermitted)
}
if p.groupname != "" || p.groupid != "" {
group := mapstr.M{}
if p.groupname != "" {
group["name"] = p.groupname
}
if p.groupid != "" {
group["id"] = p.groupid
}
process["group"] = group
}
>>>>>>> ca4adcecac ([Auditbeat] fim(kprobes): enrich file events by coupling add_process_metadata processor (#38776))

Check failure on line 385 in libbeat/processors/add_process_metadata/add_process_metadata.go

View workflow job for this annotation

GitHub Actions / lint (darwin)

expected statement, found '>>' (typecheck)

return mapstr.M{
"process": process,
Expand Down
Loading
Loading