Skip to content

Commit

Permalink
[Auditbeat] fim(kprobes): enrich file events by coupling add_process_…
Browse files Browse the repository at this point in the history
…metadata processor (#38776)

* feat(processors/process_metadata): support reporting group id and name

* feat(processors/process_metadata): support reporting process entity_id

* feat(fim/kprobes): allow metricsSets to expose beat processors after initialisation

* doc: update CHANGELOG.next.asciidoc

* fix(linter): SA1015 prevent leaking the ticker

* fix(linter): SA1019 mark metricbeat/mb deprecation warnings that are not removed yet

* fix(linter): check for return err

* fix(linter): prealloc slices

* fix(linter): remove unused field

* fix(linter): G601 prevent implicit memory aliasing in for loop

* doc: update CHANGELOG.next.asciidoc

* fix: update filebaet fields.asciidoc (unrelated to this work)

* doc: remove irrelevant changes from CHANGELOG.next.asciidoc

* feat(processor/metadata): introduce new type based allocation func

* feat(fim/kprobe): instantiate new processor alongside a new kprobes event reader

* fix(fim): remove redundant whitespace

* doc(metricbeat): enrich documentation about Processors attached to a Metricbeat

* fix(fim): gofumpt eventreader_kprobes.go

* fix(add_process_metadata): gofmt add_process_metadata.go gosysinfo_provider.go

* fix(lint): goimports eventreader_kprobes.go

* fix(winlogbeat): generate include list [unrelated to this PR]
  • Loading branch information
pkoutsovasilis authored Apr 13, 2024
1 parent ccd7b13 commit ca4adce
Show file tree
Hide file tree
Showing 19 changed files with 264 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add opt-in eBPF backend for file_integrity module. {pull}37223[37223]
- Add process data to file events (Linux only, eBPF backend). {pull}38199[38199]
- Add container id to file events (Linux only, eBPF backend). {pull}38328[38328]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]

*Filebeat*

Expand Down
37 changes: 35 additions & 2 deletions auditbeat/module/file_integrity/eventreader_kprobes.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"time"

"github.com/elastic/beats/v7/auditbeat/module/file_integrity/kprobes"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"

"golang.org/x/sys/unix"

"github.com/elastic/elastic-agent-libs/logp"
)

type kProbesReader struct {
Expand All @@ -39,6 +41,30 @@ type kProbesReader struct {
log *logp.Logger

parsers []FileParser

processor beat.Processor
}

func newKProbesReader(config Config, l *logp.Logger, parsers []FileParser) (*kProbesReader, error) {
processor, err := add_process_metadata.NewWithConfig(
add_process_metadata.ConfigOverwriteKeys(true),
add_process_metadata.ConfigMatchPIDs([]string{"process.pid"}),
)
if err != nil {
return nil, err
}

return &kProbesReader{
config: config,
eventC: make(chan Event),
log: l,
parsers: parsers,
processor: processor,
}, nil
}

func (r kProbesReader) Processor() beat.Processor {
return r.processor
}

func (r kProbesReader) Start(done <-chan struct{}) (<-chan Event, error) {
Expand Down Expand Up @@ -152,6 +178,13 @@ func (r kProbesReader) nextEvent(done <-chan struct{}) *Event {
start := time.Now()
e := NewEvent(event.Path, kProbeTypeToAction(event.Op), SourceKProbes,
r.config.MaxFileSizeBytes, r.config.HashTypes, r.parsers)

if e.Process == nil {
e.Process = &Process{}
}

e.Process.PID = event.PID

e.rtt = time.Since(start)

return &e
Expand Down
6 changes: 1 addition & 5 deletions auditbeat/module/file_integrity/eventreader_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,7 @@ func NewEventReader(c Config, logger *logp.Logger) (EventProducer, error) {
if c.Backend == BackendKprobes {
l := logger.Named("kprobes")
l.Info("selected backend: kprobes")
return &kProbesReader{
config: c,
log: l,
parsers: FileParsers(c),
}, nil
return newKProbesReader(c, l, FileParsers(c))
}

// unimplemented
Expand Down
20 changes: 20 additions & 0 deletions auditbeat/module/file_integrity/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/elastic/beats/v7/auditbeat/ab"
"github.com/elastic/beats/v7/auditbeat/datastore"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -63,6 +64,11 @@ type EventProducer interface {
Start(done <-chan struct{}) (<-chan Event, error)
}

// eventProducerWithProcessor is an EventProducer that requires a Processor
type eventProducerWithProcessor interface {
Processor() beat.Processor
}

// MetricSet for monitoring file integrity.
type MetricSet struct {
mb.BaseMetricSet
Expand All @@ -79,6 +85,9 @@ type MetricSet struct {

// Used when a hash can't be calculated
nullHashes map[HashType]Digest

// Processors
processors []beat.Processor
}

// New returns a new file.MetricSet.
Expand Down Expand Up @@ -106,6 +115,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
log: logger,
}

// reader supports a processor
if rWithProcessor, ok := r.(eventProducerWithProcessor); ok {
if proc := rWithProcessor.Processor(); proc != nil {
ms.processors = append(ms.processors, proc)
}
}

ms.nullHashes = make(map[HashType]Digest, len(config.HashTypes))
for _, hashType := range ms.config.HashTypes {
// One byte is enough so that the hashes are persisted to the datastore.
Expand All @@ -118,6 +134,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return ms, nil
}

func (ms *MetricSet) Processors() []beat.Processor {
return ms.processors
}

// Run runs the MetricSet. The method will not return control to the caller
// until it is finished (to stop it close the reporter.Done() channel).
func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
Expand Down
58 changes: 47 additions & 11 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup"
"github.com/elastic/elastic-agent-system-metrics/metric/system/resolve"
"github.com/elastic/go-sysinfo"
)

const (
Expand Down Expand Up @@ -65,17 +66,19 @@ type addProcessMetadata struct {
cidProvider cidProvider
log *logp.Logger
mappings mapstr.M
uniqueID []byte
}

type processMetadata struct {
entityID string
name, title, exe, username, userid string
args []string
env map[string]string
startTime time.Time
pid, ppid int
groupname, groupid string
capEffective, capPermitted []string
//
fields mapstr.M
fields mapstr.M
}

type processMetadataProvider interface {
Expand All @@ -93,33 +96,48 @@ func init() {

// New constructs a new add_process_metadata processor.
func New(cfg *conf.C) (beat.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache, false)
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, false)
}

// NewWithCache construct a new add_process_metadata processor with cache for container IDs.
// Resulting processor implements `Close()` to release the cache resources.
func NewWithCache(cfg *conf.C) (beat.Processor, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

return newProcessMetadataProcessorWithProvider(config, &procCache, true)
}

func NewWithConfig(opts ...ConfigOption) (beat.Processor, error) {
cfg := defaultConfig()

for _, o := range opts {
o(&cfg)
}

return newProcessMetadataProcessorWithProvider(cfg, &procCache, true)
}

func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
func newProcessMetadataProcessorWithProvider(config config, provider processMetadataProvider, withCache bool) (proc beat.Processor, err error) {
// Logging (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
log = logp.NewLogger(processorName).With("instance_id", id)
)

config := defaultConfig()
if err = cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("fail to unpack the %v configuration: %w", processorName, err)
}

// If neither option is configured, then add a default. A default cgroup_regex
// cannot be added to the struct returned by defaultConfig() because if
// config_regex is set, it would take precedence over any user-configured
// cgroup_prefixes.
hasCgroupPrefixes, _ := cfg.Has("cgroup_prefixes", -1)
hasCgroupRegex, _ := cfg.Has("cgroup_regex", -1)
hasCgroupPrefixes := len(config.CgroupPrefixes) > 0
hasCgroupRegex := config.CgroupRegex != nil
if !hasCgroupPrefixes && !hasCgroupRegex {
config.CgroupRegex = defaultCgroupRegex
}
Expand All @@ -135,6 +153,13 @@ func newProcessMetadataProcessorWithProvider(cfg *conf.C, provider processMetada
log: log,
mappings: mappings,
}

if host, _ := sysinfo.Host(); host != nil {
if uniqueID := host.Info().UniqueID; uniqueID != "" {
p.uniqueID = []byte(uniqueID)
}
}

// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if withCache && config.CgroupCacheExpireTime != 0 {
Expand Down Expand Up @@ -312,6 +337,7 @@ func (p *addProcessMetadata) String() string {

func (p *processMetadata) toMap() mapstr.M {
process := mapstr.M{
"entity_id": p.entityID,
"name": p.name,
"title": p.title,
"executable": p.exe,
Expand Down Expand Up @@ -339,6 +365,16 @@ func (p *processMetadata) toMap() mapstr.M {
if len(p.capPermitted) > 0 {
process.Put("thread.capabilities.permitted", p.capPermitted)
}
if p.groupname != "" || p.groupid != "" {
group := mapstr.M{}
if p.groupname != "" {
group["name"] = p.groupname
}
if p.groupid != "" {
group["id"] = p.groupid
}
process["group"] = group
}

return mapstr.M{
"process": process,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ func TestAddProcessMetadata(t *testing.T) {
startTime := time.Now()
testProcs := testProvider{
1: {
name: "systemd",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
name: "systemd",
entityID: "XCOVE56SVVEOKBNX",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
env: map[string]string{
"HOME": "/",
"TERM": "linux",
Expand All @@ -67,10 +68,11 @@ func TestAddProcessMetadata(t *testing.T) {
capPermitted: capMock,
},
3: {
name: "systemd",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
name: "systemd",
entityID: "XCOVE56SVVEOKBNX",
title: "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
exe: "/usr/lib/systemd/systemd",
args: []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
env: map[string]string{
"HOME": "/",
"TERM": "linux",
Expand Down Expand Up @@ -159,6 +161,7 @@ func TestAddProcessMetadata(t *testing.T) {
},
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -250,6 +253,7 @@ func TestAddProcessMetadata(t *testing.T) {
"parent": mapstr.M{
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -290,6 +294,7 @@ func TestAddProcessMetadata(t *testing.T) {
"parent": mapstr.M{
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -337,6 +342,7 @@ func TestAddProcessMetadata(t *testing.T) {
"parent": mapstr.M{
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -541,6 +547,7 @@ func TestAddProcessMetadata(t *testing.T) {
},
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -672,6 +679,7 @@ func TestAddProcessMetadata(t *testing.T) {
},
"process": mapstr.M{
"name": "systemd",
"entity_id": "XCOVE56SVVEOKBNX",
"title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22",
"executable": "/usr/lib/systemd/systemd",
"args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"},
Expand Down Expand Up @@ -768,7 +776,7 @@ func TestAddProcessMetadata(t *testing.T) {
config: mapstr.M{
"cgroup_regex": "",
},
initErr: errors.New("fail to unpack the add_process_metadata configuration: cgroup_regexp must contain exactly one capturing group for the container ID accessing config"),
initErr: errors.New("cgroup_regexp must contain exactly one capturing group for the container ID accessing config"),
},
{
description: "cgroup_prefixes configured",
Expand All @@ -789,17 +797,23 @@ func TestAddProcessMetadata(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
config, err := conf.NewConfigFrom(test.config)
if err != nil {
t.Fatal(err)
configC, err := conf.NewConfigFrom(test.config)
assert.NoError(t, err)

config := defaultConfig()
if err := configC.Unpack(&config); err != nil {
if test.initErr == nil {
t.Fatal(err)
}
assert.EqualError(t, err, test.initErr.Error())
return
}

proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
if test.initErr == nil {
if err != nil {
if err != nil {
if test.initErr == nil {
t.Fatal(err)
}
} else {
assert.EqualError(t, err, test.initErr.Error())
return
}
Expand Down Expand Up @@ -830,7 +844,11 @@ func TestAddProcessMetadata(t *testing.T) {
"include_fields": []string{"process.name"},
}

config, err := conf.NewConfigFrom(c)
configC, err := conf.NewConfigFrom(c)
assert.NoError(t, err)

config := defaultConfig()
err = configC.Unpack(&config)
assert.NoError(t, err)

proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
Expand Down
Loading

0 comments on commit ca4adce

Please sign in to comment.