From 5328640aa6733fb4a843e923f4caa0703f18bdfe Mon Sep 17 00:00:00 2001 From: Fang He Date: Wed, 25 Mar 2020 02:15:38 -0700 Subject: [PATCH] enrich container id from process id (#15947) * enrich container id from process id (cherry picked from commit 3cb957d7f9e1b6395845f73bfe976d6c258d35e0) --- CHANGELOG.next.asciidoc | 1 + .../add_process_metadata.go | 85 ++++++- .../add_process_metadata_test.go | 237 ++++++++++++++++++ .../processors/add_process_metadata/config.go | 29 ++- .../docs/add_process_metadata.asciidoc | 24 +- .../gosigar_cid_provider.go | 112 +++++++++ 6 files changed, 469 insertions(+), 19 deletions(-) create mode 100644 libbeat/processors/add_process_metadata/gosigar_cid_provider.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c10de88033a..2a6c4ab6d7c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -218,6 +218,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621] - Add `translate_sid` processor on Windows for converting Windows security identifier (SID) values to names. {issue}7451[7451] {pull}16013[16013] - Add support for kubernetes provider to recognize namespace level defaults {pull}16321[16321] +- Add capability of enrich `container.id` with process id in `add_process_metadata` processor {pull}15947[15947] - Update RPM packages contained in Beat Docker images. {issue}17035[17035] - Add Kerberos support to Kafka input and output. {pull}16781[16781] diff --git a/libbeat/processors/add_process_metadata/add_process_metadata.go b/libbeat/processors/add_process_metadata/add_process_metadata.go index 4b6270c9f6e..66c8767590a 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata.go @@ -30,11 +30,13 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/processors" jsprocessor "github.com/elastic/beats/v7/libbeat/processors/script/javascript/module/processor" + "github.com/elastic/gosigar/cgroup" ) const ( - processorName = "add_process_metadata" - cacheExpiration = time.Second * 30 + processorName = "add_process_metadata" + cacheExpiration = time.Second * 30 + containerIDMapping = "container.id" ) var ( @@ -47,14 +49,17 @@ var ( procCache = newProcessCache(cacheExpiration, gosysinfoProvider{}) + processCgroupPaths = cgroup.ProcessCgroupPaths + instanceID atomic.Uint32 ) type addProcessMetadata struct { - config config - provider processMetadataProvider - log *logp.Logger - mappings common.MapStr + config config + provider processMetadataProvider + cidProvider cidProvider + log *logp.Logger + mappings common.MapStr } type processMetadata struct { @@ -71,6 +76,10 @@ type processMetadataProvider interface { GetProcessMetadata(pid int) (*processMetadata, error) } +type cidProvider interface { + GetCid(pid int) (string, error) +} + func init() { processors.RegisterPlugin(processorName, New) jsprocessor.RegisterPlugin("AddProcessMetadata", New) @@ -93,18 +102,50 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces return nil, errors.Wrapf(err, "fail to unpack the %v configuration", processorName) } - p := addProcessMetadata{ + mappings, err := config.getMappings() + + if err != nil { + return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) + } + + var p addProcessMetadata + + p = addProcessMetadata{ config: config, provider: provider, log: log, + mappings: mappings, } - if p.mappings, err = config.getMappings(); err != nil { - return nil, errors.Wrapf(err, "error unpacking %v.target_fields", processorName) + // don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled + if ok := containsValue(mappings, "container.id"); ok { + if config.CgroupCacheExpireTime != 0 { + p.log.Debug("Initializing cgroup cache") + evictionListener := func(k common.Key, v common.Value) { + p.log.Debugf("Evicted cached cgroups for PID=%v", k) + } + + cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener) + cgroupsCache.StartJanitor(config.CgroupCacheExpireTime) + p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, cgroupsCache) + } else { + p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, processCgroupPaths, nil) + } + } return &p, nil } +// check if the value exist in mapping +func containsValue(m common.MapStr, v string) bool { + for _, x := range m { + if x == v { + return true + } + } + return false +} + // Run enriches the given event with the host meta data func (p *addProcessMetadata) Run(event *beat.Event) (*beat.Event, error) { for _, pidField := range p.config.MatchPIDs { @@ -156,6 +197,10 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul } meta := metaPtr.fields + if err = p.enrichContainerID(pid, meta); err != nil { + return nil, err + } + result = event.Clone() for dest, sourceIf := range p.mappings { source, castOk := sourceIf.(string) @@ -168,23 +213,41 @@ func (p *addProcessMetadata) enrich(event common.MapStr, pidField string) (resul return nil, errors.Errorf("target field '%s' already exists and overwrite_keys is false", dest) } } + value, err := meta.GetValue(source) if err != nil { // Should never happen return nil, err } + if _, err = result.Put(dest, value); err != nil { return nil, err } } + return result, nil } +// enrichContainerID adds container.id into meta for mapping to pickup +func (p *addProcessMetadata) enrichContainerID(pid int, meta common.MapStr) error { + if p.cidProvider == nil { + return nil + } + cid, err := p.cidProvider.GetCid(pid) + if err != nil { + return err + } + if _, err = meta.Put("container", common.MapStr{"id": cid}); err != nil { + return err + } + return nil +} + // String returns the processor representation formatted as a string func (p *addProcessMetadata) String() string { - return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v]", + return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]", processorName, p.config.MatchPIDs, p.mappings, p.config.IgnoreMissing, - p.config.OverwriteKeys, p.config.RestrictedFields) + p.config.OverwriteKeys, p.config.RestrictedFields, p.config.HostPath, p.config.CgroupPrefixes) } func (p *processMetadata) toMap() common.MapStr { diff --git a/libbeat/processors/add_process_metadata/add_process_metadata_test.go b/libbeat/processors/add_process_metadata/add_process_metadata_test.go index 16d3f38fa71..bb5eff7e193 100644 --- a/libbeat/processors/add_process_metadata/add_process_metadata_test.go +++ b/libbeat/processors/add_process_metadata/add_process_metadata_test.go @@ -51,6 +51,28 @@ func TestAddProcessMetadata(t *testing.T) { }, } + // mock of the cgroup processCgroupPaths + processCgroupPaths = func(_ string, pid int) (map[string]string, error) { + testMap := map[int]map[string]string{ + 1: map[string]string{ + "cpu": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_prio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "blkio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "perf_event": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "freezer": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "pids": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "hugetlb": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuacct": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuset": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_cls": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "devices": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "memory": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "name=systemd": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + } + return testMap[pid], nil + } + for _, test := range []struct { description string config, event, expected common.MapStr @@ -83,6 +105,9 @@ func TestAddProcessMetadata(t *testing.T) { "ppid": 0, "start_time": startTime, }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, { @@ -161,6 +186,9 @@ func TestAddProcessMetadata(t *testing.T) { "ppid": 0, "start_time": startTime, }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, }, @@ -192,6 +220,9 @@ func TestAddProcessMetadata(t *testing.T) { "LANG": "en_US.UTF-8", }, }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, }, }, }, @@ -385,12 +416,132 @@ func TestAddProcessMetadata(t *testing.T) { }, err: ErrNoProcess, }, + { + description: "env field", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "process": common.MapStr{ + "name": "systemd", + "title": "/usr/lib/systemd/systemd --switched-root --system --deserialize 22", + "executable": "/usr/lib/systemd/systemd", + "args": []string{"/usr/lib/systemd/systemd", "--switched-root", "--system", "--deserialize", "22"}, + "pid": 1, + "ppid": 0, + "start_time": startTime, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, + { + description: "env field (IncludeContainer id), process not found", + config: common.MapStr{ + "match_pids": []string{"ppid"}, + }, + event: common.MapStr{ + "ppid": 42, + }, + expected: common.MapStr{ + "ppid": 42, + }, + err: ErrNoProcess, + }, + { + description: "container.id only", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, + { + description: "without cgroup cache", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + "cgroup_cache_expire_time": 0, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, + { + description: "custom cache expire time", + config: common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + "cgroup_cache_expire_time": 10 * time.Second, + }, + event: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + }, + expected: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": "1", + }, + }, + "container": common.MapStr{ + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + }, + }, } { t.Run(test.description, func(t *testing.T) { config, err := common.NewConfigFrom(test.config) if err != nil { t.Fatal(err) } + proc, err := newProcessMetadataProcessorWithProvider(config, testProcs) if test.initErr == nil { if err != nil { @@ -421,6 +572,92 @@ func TestAddProcessMetadata(t *testing.T) { } } +func TestUsingCache(t *testing.T) { + logp.TestingSetup(logp.WithSelectors(processorName)) + + selfPID := os.Getpid() + + // mock of the cgroup processCgroupPaths + processCgroupPaths = func(_ string, pid int) (map[string]string, error) { + testMap := map[int]map[string]string{ + selfPID: map[string]string{ + "cpu": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_prio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "blkio": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "perf_event": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "freezer": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "pids": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "hugetlb": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuacct": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "cpuset": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "net_cls": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "devices": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "memory": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + "name=systemd": "/kubepods/besteffort/pod665fb997-575b-11ea-bfce-080027421ddf/b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", + }, + } + return testMap[pid], nil + } + + config, err := common.NewConfigFrom(common.MapStr{ + "match_pids": []string{"system.process.ppid"}, + "include_fields": []string{"container.id"}, + "target": "meta", + }) + + if err != nil { + t.Fatal(err) + } + proc, err := New(config) + if err != nil { + t.Fatal(err) + } + + ev := beat.Event{ + Fields: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": selfPID, + }, + }, + }, + } + + // first run + result, err := proc.Run(&ev) + if err != nil { + t.Fatal(err) + } + t.Log(result.Fields) + containerID, err := result.Fields.GetValue("meta.container.id") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", containerID) + + ev = beat.Event{ + Fields: common.MapStr{ + "system": common.MapStr{ + "process": common.MapStr{ + "ppid": selfPID, + }, + }, + }, + } + + // cached result + result, err = proc.Run(&ev) + if err != nil { + t.Fatal(err) + } + t.Log(result.Fields) + containerID, err = result.Fields.GetValue("meta.container.id") + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1", containerID) +} + func TestSelf(t *testing.T) { logp.TestingSetup(logp.WithSelectors(processorName)) config, err := common.NewConfigFrom(common.MapStr{ diff --git a/libbeat/processors/add_process_metadata/config.go b/libbeat/processors/add_process_metadata/config.go index 87d375b0a09..0ed65b1d778 100644 --- a/libbeat/processors/add_process_metadata/config.go +++ b/libbeat/processors/add_process_metadata/config.go @@ -19,6 +19,7 @@ package add_process_metadata import ( "fmt" + "time" "github.com/pkg/errors" @@ -43,6 +44,16 @@ type config struct { // Fields is the list of fields to add to target. Fields []string `config:"include_fields"` + + // HostPath is the path where /proc reside + HostPath string `config:"host_path"` + + // CgroupPrefix is the prefix where the container id is inside cgroup + CgroupPrefixes []string `config:"cgroup_prefixes"` + + // CgroupCacheExpireTime is the length of time before cgroup cache elements expire in seconds, + // set to 0 to disable the cgroup cache + CgroupCacheExpireTime time.Duration `config:"cgroup_cache_expire_time"` } // available fields by default @@ -56,6 +67,9 @@ var defaultFields = common.MapStr{ "ppid": nil, "start_time": nil, }, + "container": common.MapStr{ + "id": nil, + }, } // fields declared in here will only appear when requested explicitly @@ -72,9 +86,13 @@ func init() { func defaultConfig() config { return config{ - IgnoreMissing: true, - OverwriteKeys: false, - RestrictedFields: false, + IgnoreMissing: true, + OverwriteKeys: false, + RestrictedFields: false, + MatchPIDs: []string{"process.pid", "process.ppid", "process.parent.pid", "process.parent.ppid"}, + HostPath: "/", + CgroupPrefixes: []string{"/kubepods", "/docker"}, + CgroupCacheExpireTime: cacheExpiration, } } @@ -90,7 +108,7 @@ func (pf *config) getMappings() (mappings common.MapStr, err error) { } wantedFields := pf.Fields if len(wantedFields) == 0 { - wantedFields = []string{"process"} + wantedFields = []string{"process", "container"} } for _, docSrc := range wantedFields { dstField := fieldPrefix + docSrc @@ -99,9 +117,6 @@ func (pf *config) getMappings() (mappings common.MapStr, err error) { return nil, fmt.Errorf("field '%v' not found", docSrc) } if reqField != nil { - if len(wantedFields) != 1 { - return nil, fmt.Errorf("'%s' field cannot be used in conjunction with other fields", docSrc) - } for subField := range reqField.(common.MapStr) { key := dstField + "." + subField val := docSrc + "." + subField diff --git a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc index ef984d6176f..d3e71dca920 100644 --- a/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc +++ b/libbeat/processors/add_process_metadata/docs/add_process_metadata.asciidoc @@ -27,7 +27,10 @@ The fields added to the event look as follows: "pid": 1, "ppid": 0, "start_time": "2018-08-22T08:44:50.684Z", -} +}, +"container": { + "id": "b5285682fba7449c86452b89a800609440ecc88a7ba5f2d38bedfb85409b30b1" +}, ------------------------------------------------------------------------------- Optionally, the process environment can be included, too: @@ -42,6 +45,7 @@ Optionally, the process environment can be included, too: } ... ------------------------------------------------------------------------------- + It has the following settings: `match_pids`:: List of fields to lookup for a PID. The processor will @@ -65,3 +69,21 @@ set to `true`, this condition will be ignored. `restricted_fields`:: (Optional) By default, the `process.env` field is not output, to avoid leaking sensitive data. If `restricted_fields` is `true`, the field will be present in the output. + +`host_path`:: (Optional) By default, the `host_path` field is set to the root +directory of the host `/`. This is the path where `/proc` is mounted. For +different runtime configurations of Kubernetes or Docker, the `host_path` can +be set to overwrite the default. + +`cgroup_prefixes`:: (Optional) By default, the `cgroup_prefixes` field is set +to `/kubepods` and `/docker`. This is the prefix where the container ID is +inside cgroup. For different runtime configurations of Kubernetes or Docker, +the `cgroup_prefixes` can be set to overwrite the defaults. + +`cgroup_cache_expire_time`:: (Optional) By default, the +`cgroup_cache_expire_time` is set to 30 seconds. This is the length of time +before cgroup cache elements expire in seconds. It can be set to 0 to disable +the cgroup cache. In some container runtimes technology like runc, the +container's process is also process in the host kernel, and will be affected by +PID rollover/reuse. The expire time needs to set smaller than the PIDs wrap +around time to avoid wrong container id. diff --git a/libbeat/processors/add_process_metadata/gosigar_cid_provider.go b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go new file mode 100644 index 00000000000..d1e3951475c --- /dev/null +++ b/libbeat/processors/add_process_metadata/gosigar_cid_provider.go @@ -0,0 +1,112 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package add_process_metadata + +import ( + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +const ( + providerName = "gosigar_cid_provider" +) + +type gosigarCidProvider struct { + log *logp.Logger + hostPath string + cgroupPrefixes []string + processCgroupPaths func(string, int) (map[string]string, error) + cgroupsCache *common.Cache +} + +func (p gosigarCidProvider) GetCid(pid int) (result string, err error) { + + cgroups, err := p.getProcessCgroups(pid) + + if err != nil { + p.log.Debugf("failed to get cgroups for pid=%v: %v", pid, err) + } + + cid := p.getCid(cgroups) + + return cid, nil +} + +func newCidProvider(hostPath string, cgroupPrefixes []string, processCgroupPaths func(string, int) (map[string]string, error), cgroupsCache *common.Cache) gosigarCidProvider { + return gosigarCidProvider{ + log: logp.NewLogger(providerName), + hostPath: hostPath, + cgroupPrefixes: cgroupPrefixes, + processCgroupPaths: processCgroupPaths, + cgroupsCache: cgroupsCache, + } +} + +// getProcessCgroups returns a mapping of cgroup subsystem name to path. It +// returns an error if it failed to retrieve the cgroup info. +func (p gosigarCidProvider) getProcessCgroups(pid int) (map[string]string, error) { + + var cgroup map[string]string + var ok bool + + if p.cgroupsCache != nil { + if cgroup, ok = p.cgroupsCache.Get(pid).(map[string]string); ok { + p.log.Debugf("Using cached cgroups for pid=%v", pid) + return cgroup, nil + } + } + + cgroup, err := p.processCgroupPaths(p.hostPath, pid) + switch err.(type) { + case nil, *os.PathError: + // do no thing when err is nil or when os.PathError happens because the process don't exist, + // or not running in linux system + default: + // should never happen + return cgroup, errors.Wrapf(err, "failed to read cgroups for pid=%v", pid) + } + + if p.cgroupsCache != nil { + p.cgroupsCache.Put(pid, cgroup) + } + + return cgroup, nil +} + +// getCid checks all of the processes' paths to see if any +// of them are associated with Kubernetes. Kubernetes uses /kubepods/// when +// naming cgroups and we use this to determine the container ID. If no container +// ID is found then an empty string is returned. +// Example: +// /kubepods/besteffort/pod9b9e44c2-00fd-11ea-95e9-080027421ddf/2bb9fd4de339e5d4f094e78bb87636004acfe53f5668104addc761fe4a93588e +func (p gosigarCidProvider) getCid(cgroups map[string]string) string { + for _, path := range cgroups { + for _, prefix := range p.cgroupPrefixes { + if strings.HasPrefix(path, prefix) { + return filepath.Base(path) + } + } + } + return "" +}