From 2b7feaafa8c998386e0e2b5ef137f9baac46d9a8 Mon Sep 17 00:00:00 2001 From: braydonk Date: Tue, 30 Jan 2024 15:51:15 +0000 Subject: [PATCH 01/10] Add system.processes.* to processscraper This commit deprecates the processesscraper in favour of adding the functionality to the processscraper. --- receiver/hostmetricsreceiver/README.md | 25 +- .../hostmetrics_receiver_test.go | 119 ++++- .../scraper/processesscraper/factory.go | 17 +- .../processesscraper/processes_scraper.go | 26 +- .../processes_scraper_test.go | 20 +- .../processes_scraper_unix.go | 6 +- .../processscraper/aggregate_process.go | 18 + .../aggregate_process_fallback.go | 14 + .../processscraper/aggregate_process_unix.go | 89 ++++ .../scraper/processscraper/documentation.md | 22 + .../scraper/processscraper/factory.go | 9 + .../internal/metadata/generated_config.go | 12 +- .../metadata/generated_config_test.go | 4 + .../internal/metadata/generated_metrics.go | 190 +++++++ .../metadata/generated_metrics_test.go | 37 ++ .../internal/metadata/testdata/config.yaml | 8 + .../scraper/processscraper/metadata.yaml | 26 +- .../scraper/processscraper/process.go | 1 + .../scraper/processscraper/process_scraper.go | 42 +- .../processscraper/process_scraper_test.go | 467 +++++++++++++++++- 20 files changed, 1100 insertions(+), 52 deletions(-) create mode 100644 receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go create mode 100644 receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go diff --git a/receiver/hostmetricsreceiver/README.md b/receiver/hostmetricsreceiver/README.md index 739a38d2e768..f9b9f975f275 100644 --- a/receiver/hostmetricsreceiver/README.md +++ b/receiver/hostmetricsreceiver/README.md @@ -47,8 +47,8 @@ The available scrapers are: | [memory] | All | Memory utilization metrics | | [network] | All | Network interface I/O metrics & TCP connection metrics | | [paging] | All | Paging/Swap space utilization and I/O metrics | -| [processes] | Linux, Mac | Process count metrics | -| [process] | Linux, Windows, Mac | Per process CPU, Memory, and Disk I/O metrics | +| [processes] | Linux, Mac | DEPRECATED: Use `process` scraper | +| [process] | Linux, Windows, Mac | Per process CPU, Memory, and Disk I/O metrics, full system process counts | [cpu]: ./internal/scraper/cpuscraper/documentation.md [disk]: ./internal/scraper/diskscraper/documentation.md @@ -160,14 +160,14 @@ service: Host metrics are collected from the Linux system directories on the filesystem. You likely want to collect metrics about the host system and not the container. -This is achievable by following these steps: +This is achievable by following these steps: #### 1. Bind mount the host filesystem -The simplest configuration is to mount the entire host filesystem when running +The simplest configuration is to mount the entire host filesystem when running the container. e.g. `docker run -v /:/hostfs ...`. -You can also choose which parts of the host filesystem to mount, if you know +You can also choose which parts of the host filesystem to mount, if you know exactly what you'll need. e.g. `docker run -v /proc:/hostfs/proc`. #### 2. Configure `root_path` @@ -191,3 +191,18 @@ Currently, the hostmetrics receiver does not set any Resource attributes on the export OTEL_RESOURCE_ATTRIBUTES="service.name=,service.namespace=,service.instance.id=" ``` +## Processes scraper deprecation + +The `processes` scraper has been deprecated in favor of the `process` scraper. The `processes` scraper will be removed in a future release. To enable the same functionality, remove the `processes` scraper and enable the `system.processes.*` metrics in the `process` scraper: + +```yaml +receivers: + hostmetrics: + scrapers: + process: + metrics: + system.processes.created: + enabled: true + system.processes.count: + enabled: true +``` \ No newline at end of file diff --git a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go index fb51a59d95ca..0b84beee2a27 100644 --- a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go +++ b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go @@ -210,6 +210,92 @@ func appendMapInto(m1 map[string]struct{}, m2 map[string]struct{}) { } } +func Test_ProcessesScraperDeprecationCompatibility(t *testing.T) { + if runtime.GOOS != "linux" { + t.Skip("Skipping test on non-Linux platform") + } + processesConfig := &Config{ + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ + CollectionInterval: 100 * time.Millisecond, + }, + Scrapers: map[string]internal.Config{ + processscraper.TypeStr: scraperFactories[processscraper.TypeStr].CreateDefaultConfig(), + processesscraper.TypeStr: scraperFactories[processesscraper.TypeStr].CreateDefaultConfig(), + }, + } + processConfig := &Config{ + ScraperControllerSettings: scraperhelper.ScraperControllerSettings{ + CollectionInterval: 100 * time.Millisecond, + }, + Scrapers: map[string]internal.Config{ + processscraper.TypeStr: (&processscraper.Factory{}).CreateDefaultConfigWithSystemProcessesEnabled(), + }, + } + + processesSink := runScrapeForConfig(t, processesConfig) + processSink := runScrapeForConfig(t, processConfig) + assertProcessMetricShape(t, processesSink) + assertProcessMetricShape(t, processSink) +} + +func runScrapeForConfig(t *testing.T, cfg *Config) *consumertest.MetricsSink { + t.Helper() + + scraperFactories = factories + sink := new(consumertest.MetricsSink) + + receiver, err := NewFactory().CreateMetricsReceiver(context.Background(), creationSet, cfg, sink) + require.NoError(t, err, "Failed to create metrics receiver: %v", err) + + ctx, cancelFn := context.WithCancel(context.Background()) + err = receiver.Start(ctx, componenttest.NewNopHost()) + require.NoError(t, err, "Failed to start metrics receiver: %v", err) + defer func() { assert.NoError(t, receiver.Shutdown(context.Background())) }() + + // canceling the context provided to Start should not cancel any async processes initiated by the receiver + cancelFn() + + const tick = 50 * time.Millisecond + const waitFor = 10 * time.Second + require.Eventuallyf(t, func() bool { + return len(sink.AllMetrics()) > 0 + }, waitFor, tick, "No metrics were collected after %v", waitFor) + + return sink +} + +func assertProcessMetricShape(t *testing.T, sink *consumertest.MetricsSink) { + // Whether the metrics came from using the deprecated scraper or just the process scraper + // with the system.processes.* metrics enabled, the result should be the same. + // There will be process resources at the beginning, and at the end should be an empty + // resource with the `system.processes.*` metrics present. + + metrics := sink.AllMetrics()[0] + + // Check that all resources up until the final one are process resources. + for i := 0; i < metrics.ResourceMetrics().Len()-1; i++ { + _, ok := metrics.ResourceMetrics().At(i).Resource().Attributes().Get("process.pid") + assert.True(t, ok) + } + + // Check that the final resource has the system.processes.* metrics. + finalResourceMetrics := metrics.ResourceMetrics().At(metrics.ResourceMetrics().Len() - 1).ScopeMetrics().At(0).Metrics() + found := map[string]bool{ + "system.processes.count": false, + "system.processes.created": false, + } + for i := 0; i < finalResourceMetrics.Len(); i++ { + metric := finalResourceMetrics.At(i) + if metric.Name() == "system.processes.count" { + found["system.processes.count"] = true + } else if metric.Name() == "system.processes.created" { + found["system.processes.created"] = true + } + } + assert.True(t, found["system.processes.count"]) + assert.True(t, found["system.processes.created"]) +} + const mockTypeStr = "mock" type mockConfig struct{} @@ -404,7 +490,7 @@ func Benchmark_ScrapeSystemMetrics(b *testing.B) { } func Benchmark_ScrapeSystemAndProcessMetrics(b *testing.B) { - if runtime.GOOS != "linux" && runtime.GOOS != "windows" { + if runtime.GOOS != "linux" { b.Skip("skipping test on non linux/windows") } @@ -424,3 +510,34 @@ func Benchmark_ScrapeSystemAndProcessMetrics(b *testing.B) { benchmarkScrapeMetrics(b, cfg) } + +func Benchmark_ScrapeProcessMetricsWithSystemProcessMetrics(b *testing.B) { + if runtime.GOOS != "linux" { + b.Skip("skipping test on non linux") + } + + cfg := &Config{ + ScraperControllerSettings: scraperhelper.NewDefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{ + processscraper.TypeStr: (&processscraper.Factory{}).CreateDefaultConfigWithSystemProcessesEnabled(), + }, + } + + benchmarkScrapeMetrics(b, cfg) +} + +func Benchmark_ScrapeProcessMetricsWithDeprecatedProcessesScraper(b *testing.B) { + if runtime.GOOS != "linux" { + b.Skip("skipping test on non linux") + } + + cfg := &Config{ + ScraperControllerSettings: scraperhelper.NewDefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{ + processscraper.TypeStr: (&processscraper.Factory{}).CreateDefaultConfig(), + processesscraper.TypeStr: (&processesscraper.Factory{}).CreateDefaultConfig(), + }, + } + + benchmarkScrapeMetrics(b, cfg) +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go index 925e38baf0c8..fce24f3d2691 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go @@ -37,12 +37,23 @@ func (f *Factory) CreateMetricsScraper( settings receiver.CreateSettings, config internal.Config, ) (scraperhelper.Scraper, error) { + settings.Logger.Warn(`processes scraping is deprecated, system.processes.created and system.processes.count metrics have been moved to the process scraper. + To enable them, apply the following config: + + scrapers: + process: + metrics: + system.processes.created: + enabled: true + system.processes.count: + enabled: true +`) cfg := config.(*Config) - s := newProcessesScraper(ctx, settings, cfg) + s := NewProcessesScraper(ctx, settings, cfg) return scraperhelper.NewScraper( TypeStr, - s.scrape, - scraperhelper.WithStart(s.start), + s.Scrape, + scraperhelper.WithStart(s.Start), ) } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go index 5fa450273da2..ad9789f65b41 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go @@ -31,20 +31,20 @@ var metricsLength = func() int { return n }() -// scraper for Processes Metrics -type scraper struct { +// Scraper for Processes Metrics +type Scraper struct { settings receiver.CreateSettings config *Config mb *metadata.MetricsBuilder // for mocking gopsutil - getMiscStats func(context.Context) (*load.MiscStat, error) - getProcesses func() ([]proc, error) + GetMiscStats func(context.Context) (*load.MiscStat, error) + GetProcesses func() ([]Proc, error) bootTime func(context.Context) (uint64, error) } // for mocking out gopsutil process.Process -type proc interface { +type Proc interface { Status() ([]string, error) } @@ -53,16 +53,16 @@ type processesMetadata struct { processesCreated *int64 // ignored if enableProcessesCreated is false } -// newProcessesScraper creates a set of Processes related metrics -func newProcessesScraper(_ context.Context, settings receiver.CreateSettings, cfg *Config) *scraper { - return &scraper{ +// NewProcessesScraper creates a set of Processes related metrics +func NewProcessesScraper(_ context.Context, settings receiver.CreateSettings, cfg *Config) *Scraper { + return &Scraper{ settings: settings, config: cfg, - getMiscStats: load.MiscWithContext, - getProcesses: func() ([]proc, error) { + GetMiscStats: load.MiscWithContext, + GetProcesses: func() ([]Proc, error) { ctx := context.WithValue(context.Background(), common.EnvKey, cfg.EnvMap) ps, err := process.ProcessesWithContext(ctx) - ret := make([]proc, len(ps)) + ret := make([]Proc, len(ps)) for i := range ps { ret[i] = ps[i] } @@ -72,7 +72,7 @@ func newProcessesScraper(_ context.Context, settings receiver.CreateSettings, cf } } -func (s *scraper) start(ctx context.Context, _ component.Host) error { +func (s *Scraper) Start(ctx context.Context, _ component.Host) error { ctx = context.WithValue(ctx, common.EnvKey, s.config.EnvMap) bootTime, err := s.bootTime(ctx) if err != nil { @@ -83,7 +83,7 @@ func (s *scraper) start(ctx context.Context, _ component.Host) error { return nil } -func (s *scraper) scrape(_ context.Context) (pmetric.Metrics, error) { +func (s *Scraper) Scrape(_ context.Context) (pmetric.Metrics, error) { now := pcommon.NewTimestampFromTime(time.Now()) md := pmetric.NewMetrics() diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go index cf37e89b147f..c11eea39343f 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go @@ -33,7 +33,7 @@ func TestScrape(t *testing.T) { type testCase struct { name string getMiscStats func(context.Context) (*load.MiscStat, error) - getProcesses func() ([]proc, error) + getProcesses func() ([]Proc, error) expectedErr string validate func(*testing.T, pmetric.MetricSlice) } @@ -44,7 +44,7 @@ func TestScrape(t *testing.T) { }, { name: "FakeData", getMiscStats: func(ctx context.Context) (*load.MiscStat, error) { return &fakeData, nil }, - getProcesses: func() ([]proc, error) { return fakeProcessesData, nil }, + getProcesses: func() ([]Proc, error) { return fakeProcessesData, nil }, validate: validateFakeData, }, { name: "ErrorFromMiscStat", @@ -52,11 +52,11 @@ func TestScrape(t *testing.T) { expectedErr: "err1", }, { name: "ErrorFromProcesses", - getProcesses: func() ([]proc, error) { return nil, errors.New("err2") }, + getProcesses: func() ([]Proc, error) { return nil, errors.New("err2") }, expectedErr: "err2", }, { name: "ErrorFromProcessShouldBeIgnored", - getProcesses: func() ([]proc, error) { return []proc{errProcess{}}, nil }, + getProcesses: func() ([]Proc, error) { return []Proc{errProcess{}}, nil }, }, { name: "Validate Start Time", validate: validateStartTime, @@ -64,21 +64,21 @@ func TestScrape(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - scraper := newProcessesScraper(context.Background(), receivertest.NewNopCreateSettings(), &Config{ + scraper := NewProcessesScraper(context.Background(), receivertest.NewNopCreateSettings(), &Config{ MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), }) - err := scraper.start(context.Background(), componenttest.NewNopHost()) + err := scraper.Start(context.Background(), componenttest.NewNopHost()) assert.NoError(t, err, "Failed to initialize processes scraper: %v", err) // Override scraper methods if we are mocking out for this test case if test.getMiscStats != nil { - scraper.getMiscStats = test.getMiscStats + scraper.GetMiscStats = test.getMiscStats } if test.getProcesses != nil { - scraper.getProcesses = test.getProcesses + scraper.GetProcesses = test.getProcesses } - md, err := scraper.scrape(context.Background()) + md, err := scraper.Scrape(context.Background()) expectedMetricCount := 0 if expectProcessesCountMetric { @@ -166,7 +166,7 @@ var fakeData = load.MiscStat{ ProcsTotal: 30, } -var fakeProcessesData = []proc{ +var fakeProcessesData = []Proc{ fakeProcess(process.Wait), fakeProcess(process.Blocked), fakeProcess(process.Blocked), fakeProcess(process.Running), fakeProcess(process.Running), fakeProcess(process.Running), diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go index f257fc8408c9..7135749f7ceb 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go @@ -19,9 +19,9 @@ import ( const enableProcessesCount = true const enableProcessesCreated = runtime.GOOS == "openbsd" || runtime.GOOS == "linux" -func (s *scraper) getProcessesMetadata() (processesMetadata, error) { +func (s *Scraper) getProcessesMetadata() (processesMetadata, error) { ctx := context.WithValue(context.Background(), common.EnvKey, s.config.EnvMap) - processes, err := s.getProcesses() + processes, err := s.GetProcesses() if err != nil { return processesMetadata{}, err } @@ -46,7 +46,7 @@ func (s *scraper) getProcessesMetadata() (processesMetadata, error) { // Processes are actively changing as we run this code, so this reason // the above loop will tend to underestimate process counts. // getMiscStats is a single read/syscall so it should be more accurate. - miscStat, err := s.getMiscStats(ctx) + miscStat, err := s.GetMiscStats(ctx) if err != nil { return processesMetadata{}, err } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go new file mode 100644 index 000000000000..86007f73230d --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go @@ -0,0 +1,18 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package processscraper + +import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata" + +type aggregateProcessMetadata struct { + countByStatus map[metadata.AttributeStatus]int64 + processesCreated int64 +} + +// Check if any aggregate process metrics are enabled. If neither are enabled, +// we can save on some syscalls by skipping out on calculating them. +func (s *scraper) collectAggregateProcessMetrics() bool { + return s.config.MetricsBuilderConfig.Metrics.SystemProcessesCount.Enabled || + s.config.MetricsBuilderConfig.Metrics.SystemProcessesCreated.Enabled +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go new file mode 100644 index 000000000000..888cfacbb2f4 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build !linux && !darwin && !freebsd && !openbsd +// +build !linux,!darwin,!freebsd,!openbsd + +package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processesscraper" + +const enableProcessesCount = false +const enableProcessesCreated = false + +func (s *scraper) getProcessesMetadata() (processesMetadata, error) { + return processesMetadata{}, nil +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go new file mode 100644 index 000000000000..87ee5c0c49d7 --- /dev/null +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build linux || darwin || freebsd || openbsd +// +build linux darwin freebsd openbsd + +package processscraper + +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata" + "github.com/shirou/gopsutil/v3/common" + "github.com/shirou/gopsutil/v3/process" +) + +func (s *scraper) getAggregateProcessMetadata(handles processHandles) (aggregateProcessMetadata, error) { + if !s.collectAggregateProcessMetrics() { + return aggregateProcessMetadata{}, nil + } + + ctx := context.WithValue(context.Background(), common.EnvKey, s.config.EnvMap) + + countByStatus := map[metadata.AttributeStatus]int64{} + for i := 0; i < handles.Len(); i++ { + handle := handles.At(i) + status, err := handle.Status() + if err != nil { + // We expect an error in the case that a process has + // been terminated as we run this code. + continue + } + state, ok := toAttributeStatus(status) + if !ok { + countByStatus[metadata.AttributeStatusUnknown]++ + continue + } + countByStatus[state]++ + } + + // Processes are actively changing as we run this code, so this reason + // the above loop will tend to underestimate process counts. + // load.MiscWithContext is a single read/syscall so it should be more accurate. + miscStat, err := s.getMiscStats(ctx) + if err != nil { + return aggregateProcessMetadata{}, err + } + + procsCreated := int64(miscStat.ProcsCreated) + + countByStatus[metadata.AttributeStatusBlocked] = int64(miscStat.ProcsBlocked) + countByStatus[metadata.AttributeStatusRunning] = int64(miscStat.ProcsRunning) + + totalKnown := int64(0) + for _, count := range countByStatus { + totalKnown += count + } + if int64(miscStat.ProcsTotal) > totalKnown { + countByStatus[metadata.AttributeStatusUnknown] = int64(miscStat.ProcsTotal) - totalKnown + } + + return aggregateProcessMetadata{ + countByStatus: countByStatus, + processesCreated: procsCreated, + }, nil +} + +func toAttributeStatus(status []string) (metadata.AttributeStatus, bool) { + if len(status) == 0 || len(status[0]) == 0 { + return metadata.AttributeStatus(0), false + } + state, ok := charToState[status[0]] + return state, ok +} + +var charToState = map[string]metadata.AttributeStatus{ + process.Blocked: metadata.AttributeStatusBlocked, + process.Daemon: metadata.AttributeStatusDaemon, + process.Detached: metadata.AttributeStatusDetached, + process.Idle: metadata.AttributeStatusIdle, + process.Lock: metadata.AttributeStatusLocked, + process.Orphan: metadata.AttributeStatusOrphan, + process.Running: metadata.AttributeStatusRunning, + process.Sleep: metadata.AttributeStatusSleeping, + process.Stop: metadata.AttributeStatusStopped, + process.System: metadata.AttributeStatusSystem, + process.Wait: metadata.AttributeStatusPaging, + process.Zombie: metadata.AttributeStatusZombies, +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/documentation.md b/receiver/hostmetricsreceiver/internal/scraper/processscraper/documentation.md index b8fbcbe66ff6..07f52a7475a2 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/documentation.md +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/documentation.md @@ -174,6 +174,28 @@ Process threads count. | ---- | ----------- | ---------- | ----------------------- | --------- | | {threads} | Sum | Int | Cumulative | false | +### system.processes.count + +Total number of processes in each state. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {processes} | Sum | Int | Cumulative | false | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| status | Breakdown status of the processes. | Str: ``blocked``, ``daemon``, ``detached``, ``idle``, ``locked``, ``orphan``, ``paging``, ``running``, ``sleeping``, ``stopped``, ``system``, ``unknown``, ``zombies`` | + +### system.processes.created + +Total number of created processes. + +| Unit | Metric Type | Value Type | Aggregation Temporality | Monotonic | +| ---- | ----------- | ---------- | ----------------------- | --------- | +| {processes} | Sum | Int | Cumulative | true | + ## Resource Attributes | Name | Description | Values | Enabled | diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go index a831f05b398e..676f7d615976 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go @@ -33,6 +33,15 @@ func (f *Factory) CreateDefaultConfig() internal.Config { } } +func (f *Factory) CreateDefaultConfigWithSystemProcessesEnabled() internal.Config { + mb := metadata.DefaultMetricsBuilderConfig() + mb.Metrics.SystemProcessesCount.Enabled = true + mb.Metrics.SystemProcessesCreated.Enabled = true + return &Config{ + MetricsBuilderConfig: mb, + } +} + // CreateMetricsScraper creates a resource scraper based on provided config. func (f *Factory) CreateMetricsScraper( _ context.Context, diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go index 002379248634..e9a7ea20dc6a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go @@ -15,7 +15,7 @@ func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { if parser == nil { return nil } - err := parser.Unmarshal(ms) + err := parser.Unmarshal(ms, confmap.WithErrorUnused()) if err != nil { return err } @@ -38,6 +38,8 @@ type MetricsConfig struct { ProcessPagingFaults MetricConfig `mapstructure:"process.paging.faults"` ProcessSignalsPending MetricConfig `mapstructure:"process.signals_pending"` ProcessThreads MetricConfig `mapstructure:"process.threads"` + SystemProcessesCount MetricConfig `mapstructure:"system.processes.count"` + SystemProcessesCreated MetricConfig `mapstructure:"system.processes.created"` } func DefaultMetricsConfig() MetricsConfig { @@ -81,6 +83,12 @@ func DefaultMetricsConfig() MetricsConfig { ProcessThreads: MetricConfig{ Enabled: false, }, + SystemProcessesCount: MetricConfig{ + Enabled: false, + }, + SystemProcessesCreated: MetricConfig{ + Enabled: false, + }, } } @@ -95,7 +103,7 @@ func (rac *ResourceAttributeConfig) Unmarshal(parser *confmap.Conf) error { if parser == nil { return nil } - err := parser.Unmarshal(rac) + err := parser.Unmarshal(rac, confmap.WithErrorUnused()) if err != nil { return err } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config_test.go index 007ffa2ec44a..ec7f32eca8c7 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config_test.go @@ -39,6 +39,8 @@ func TestMetricsBuilderConfig(t *testing.T) { ProcessPagingFaults: MetricConfig{Enabled: true}, ProcessSignalsPending: MetricConfig{Enabled: true}, ProcessThreads: MetricConfig{Enabled: true}, + SystemProcessesCount: MetricConfig{Enabled: true}, + SystemProcessesCreated: MetricConfig{Enabled: true}, }, ResourceAttributes: ResourceAttributesConfig{ ProcessCommand: ResourceAttributeConfig{Enabled: true}, @@ -68,6 +70,8 @@ func TestMetricsBuilderConfig(t *testing.T) { ProcessPagingFaults: MetricConfig{Enabled: false}, ProcessSignalsPending: MetricConfig{Enabled: false}, ProcessThreads: MetricConfig{Enabled: false}, + SystemProcessesCount: MetricConfig{Enabled: false}, + SystemProcessesCreated: MetricConfig{Enabled: false}, }, ResourceAttributes: ResourceAttributesConfig{ ProcessCommand: ResourceAttributeConfig{Enabled: false}, diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics.go index 8c84375ce48d..89bd5175a632 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics.go @@ -120,6 +120,76 @@ var MapAttributeState = map[string]AttributeState{ "wait": AttributeStateWait, } +// AttributeStatus specifies the a value status attribute. +type AttributeStatus int + +const ( + _ AttributeStatus = iota + AttributeStatusBlocked + AttributeStatusDaemon + AttributeStatusDetached + AttributeStatusIdle + AttributeStatusLocked + AttributeStatusOrphan + AttributeStatusPaging + AttributeStatusRunning + AttributeStatusSleeping + AttributeStatusStopped + AttributeStatusSystem + AttributeStatusUnknown + AttributeStatusZombies +) + +// String returns the string representation of the AttributeStatus. +func (av AttributeStatus) String() string { + switch av { + case AttributeStatusBlocked: + return "blocked" + case AttributeStatusDaemon: + return "daemon" + case AttributeStatusDetached: + return "detached" + case AttributeStatusIdle: + return "idle" + case AttributeStatusLocked: + return "locked" + case AttributeStatusOrphan: + return "orphan" + case AttributeStatusPaging: + return "paging" + case AttributeStatusRunning: + return "running" + case AttributeStatusSleeping: + return "sleeping" + case AttributeStatusStopped: + return "stopped" + case AttributeStatusSystem: + return "system" + case AttributeStatusUnknown: + return "unknown" + case AttributeStatusZombies: + return "zombies" + } + return "" +} + +// MapAttributeStatus is a helper map of string to AttributeStatus attribute value. +var MapAttributeStatus = map[string]AttributeStatus{ + "blocked": AttributeStatusBlocked, + "daemon": AttributeStatusDaemon, + "detached": AttributeStatusDetached, + "idle": AttributeStatusIdle, + "locked": AttributeStatusLocked, + "orphan": AttributeStatusOrphan, + "paging": AttributeStatusPaging, + "running": AttributeStatusRunning, + "sleeping": AttributeStatusSleeping, + "stopped": AttributeStatusStopped, + "system": AttributeStatusSystem, + "unknown": AttributeStatusUnknown, + "zombies": AttributeStatusZombies, +} + type metricProcessContextSwitches struct { data pmetric.Metric // data buffer for generated metric. config MetricConfig // metric config provided by user. @@ -791,6 +861,110 @@ func newMetricProcessThreads(cfg MetricConfig) metricProcessThreads { return m } +type metricSystemProcessesCount struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills system.processes.count metric with initial data. +func (m *metricSystemProcessesCount) init() { + m.data.SetName("system.processes.count") + m.data.SetDescription("Total number of processes in each state.") + m.data.SetUnit("{processes}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(false) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + m.data.Sum().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricSystemProcessesCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, statusAttributeValue string) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) + dp.Attributes().PutStr("status", statusAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricSystemProcessesCount) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricSystemProcessesCount) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricSystemProcessesCount(cfg MetricConfig) metricSystemProcessesCount { + m := metricSystemProcessesCount{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +type metricSystemProcessesCreated struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills system.processes.created metric with initial data. +func (m *metricSystemProcessesCreated) init() { + m.data.SetName("system.processes.created") + m.data.SetDescription("Total number of created processes.") + m.data.SetUnit("{processes}") + m.data.SetEmptySum() + m.data.Sum().SetIsMonotonic(true) + m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) +} + +func (m *metricSystemProcessesCreated) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) { + if !m.config.Enabled { + return + } + dp := m.data.Sum().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricSystemProcessesCreated) updateCapacity() { + if m.data.Sum().DataPoints().Len() > m.capacity { + m.capacity = m.data.Sum().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricSystemProcessesCreated) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricSystemProcessesCreated(cfg MetricConfig) metricSystemProcessesCreated { + m := metricSystemProcessesCreated{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + // MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations // required to produce metric representation defined in metadata and user config. type MetricsBuilder struct { @@ -812,6 +986,8 @@ type MetricsBuilder struct { metricProcessPagingFaults metricProcessPagingFaults metricProcessSignalsPending metricProcessSignalsPending metricProcessThreads metricProcessThreads + metricSystemProcessesCount metricSystemProcessesCount + metricSystemProcessesCreated metricSystemProcessesCreated } // metricBuilderOption applies changes to default metrics builder. @@ -843,6 +1019,8 @@ func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.CreateSetting metricProcessPagingFaults: newMetricProcessPagingFaults(mbc.Metrics.ProcessPagingFaults), metricProcessSignalsPending: newMetricProcessSignalsPending(mbc.Metrics.ProcessSignalsPending), metricProcessThreads: newMetricProcessThreads(mbc.Metrics.ProcessThreads), + metricSystemProcessesCount: newMetricSystemProcessesCount(mbc.Metrics.SystemProcessesCount), + metricSystemProcessesCreated: newMetricSystemProcessesCreated(mbc.Metrics.SystemProcessesCreated), } for _, op := range options { op(mb) @@ -918,6 +1096,8 @@ func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { mb.metricProcessPagingFaults.emit(ils.Metrics()) mb.metricProcessSignalsPending.emit(ils.Metrics()) mb.metricProcessThreads.emit(ils.Metrics()) + mb.metricSystemProcessesCount.emit(ils.Metrics()) + mb.metricSystemProcessesCreated.emit(ils.Metrics()) for _, op := range rmo { op(rm) @@ -1003,6 +1183,16 @@ func (mb *MetricsBuilder) RecordProcessThreadsDataPoint(ts pcommon.Timestamp, va mb.metricProcessThreads.recordDataPoint(mb.startTime, ts, val) } +// RecordSystemProcessesCountDataPoint adds a data point to system.processes.count metric. +func (mb *MetricsBuilder) RecordSystemProcessesCountDataPoint(ts pcommon.Timestamp, val int64, statusAttributeValue AttributeStatus) { + mb.metricSystemProcessesCount.recordDataPoint(mb.startTime, ts, val, statusAttributeValue.String()) +} + +// RecordSystemProcessesCreatedDataPoint adds a data point to system.processes.created metric. +func (mb *MetricsBuilder) RecordSystemProcessesCreatedDataPoint(ts pcommon.Timestamp, val int64) { + mb.metricSystemProcessesCreated.recordDataPoint(mb.startTime, ts, val) +} + // Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, // and metrics builder should update its startTime and reset it's internal state accordingly. func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics_test.go index 0a469f044a5a..ea5754cca316 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_metrics_test.go @@ -98,6 +98,12 @@ func TestMetricsBuilder(t *testing.T) { allMetricsCount++ mb.RecordProcessThreadsDataPoint(ts, 1) + allMetricsCount++ + mb.RecordSystemProcessesCountDataPoint(ts, 1, AttributeStatusBlocked) + + allMetricsCount++ + mb.RecordSystemProcessesCreatedDataPoint(ts, 1) + rb := mb.NewResourceBuilder() rb.SetProcessCommand("process.command-val") rb.SetProcessCommandLine("process.command_line-val") @@ -324,6 +330,37 @@ func TestMetricsBuilder(t *testing.T) { assert.Equal(t, ts, dp.Timestamp()) assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) assert.Equal(t, int64(1), dp.IntValue()) + case "system.processes.count": + assert.False(t, validatedMetrics["system.processes.count"], "Found a duplicate in the metrics slice: system.processes.count") + validatedMetrics["system.processes.count"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Total number of processes in each state.", ms.At(i).Description()) + assert.Equal(t, "{processes}", ms.At(i).Unit()) + assert.Equal(t, false, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + attrVal, ok := dp.Attributes().Get("status") + assert.True(t, ok) + assert.EqualValues(t, "blocked", attrVal.Str()) + case "system.processes.created": + assert.False(t, validatedMetrics["system.processes.created"], "Found a duplicate in the metrics slice: system.processes.created") + validatedMetrics["system.processes.created"] = true + assert.Equal(t, pmetric.MetricTypeSum, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Sum().DataPoints().Len()) + assert.Equal(t, "Total number of created processes.", ms.At(i).Description()) + assert.Equal(t, "{processes}", ms.At(i).Unit()) + assert.Equal(t, true, ms.At(i).Sum().IsMonotonic()) + assert.Equal(t, pmetric.AggregationTemporalityCumulative, ms.At(i).Sum().AggregationTemporality()) + dp := ms.At(i).Sum().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) } } }) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/testdata/config.yaml b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/testdata/config.yaml index 235b54919125..058c1046b1fc 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/testdata/config.yaml +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/testdata/config.yaml @@ -27,6 +27,10 @@ all_set: enabled: true process.threads: enabled: true + system.processes.count: + enabled: true + system.processes.created: + enabled: true resource_attributes: process.command: enabled: true @@ -70,6 +74,10 @@ none_set: enabled: false process.threads: enabled: false + system.processes.count: + enabled: false + system.processes.created: + enabled: false resource_attributes: process.command: enabled: false diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/metadata.yaml b/receiver/hostmetricsreceiver/internal/scraper/processscraper/metadata.yaml index 51fa89be2c3d..9a45287bb736 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/metadata.yaml +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/metadata.yaml @@ -70,6 +70,11 @@ attributes: type: string enum: [involuntary, voluntary] + status: + description: Breakdown status of the processes. + type: string + enum: [blocked, daemon, detached, idle, locked, orphan, paging, running, sleeping, stopped, system, unknown, zombies] + metrics: process.cpu.time: enabled: true @@ -176,7 +181,7 @@ metrics: value_type: int aggregation_temporality: cumulative monotonic: false - + process.context_switches: enabled: false description: Number of times the process has been context switched. @@ -197,3 +202,22 @@ metrics: aggregation_temporality: cumulative monotonic: true attributes: [direction] + + system.processes.created: + enabled: false + description: Total number of created processes. + unit: "{processes}" + sum: + value_type: int + aggregation_temporality: cumulative + monotonic: true + + system.processes.count: + enabled: false + description: Total number of processes in each state. + unit: "{processes}" + sum: + value_type: int + aggregation_temporality: cumulative + monotonic: false + attributes: [status] diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go index be5231652e3e..708fb1638b4e 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process.go @@ -89,6 +89,7 @@ type processHandle interface { PageFaultsWithContext(context.Context) (*process.PageFaultsStat, error) NumCtxSwitchesWithContext(context.Context) (*process.NumCtxSwitchesStat, error) NumFDsWithContext(context.Context) (int32, error) + Status() ([]string, error) // If gatherUsed is true, the currently used value will be gathered and added to the resulting RlimitStat. RlimitUsageWithContext(ctx context.Context, gatherUsed bool) ([]process.RlimitStat, error) } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go index f922031f663b..a661adfdbbb1 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go @@ -11,6 +11,7 @@ import ( "time" "github.com/shirou/gopsutil/v3/common" + "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/process" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" @@ -52,6 +53,7 @@ type scraper struct { // for mocking getProcessCreateTime func(p processHandle, ctx context.Context) (int64, error) getProcessHandles func(context.Context) (processHandles, error) + getMiscStats func(context.Context) (*load.MiscStat, error) handleCountManager handlecount.Manager } @@ -63,6 +65,7 @@ func newProcessScraper(settings receiver.CreateSettings, cfg *Config) (*scraper, config: cfg, getProcessCreateTime: processHandle.CreateTimeWithContext, getProcessHandles: getProcessHandlesInternal, + getMiscStats: load.MiscWithContext, scrapeProcessDelay: cfg.ScrapeProcessDelay, ucals: make(map[int32]*ucal.CPUUtilizationCalculator), handleCountManager: handlecount.NewManager(), @@ -95,7 +98,11 @@ func (s *scraper) start(context.Context, component.Host) error { func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) { var errs scrapererror.ScrapeErrors - data, err := s.getProcessMetadata() + phs, err := s.getProcessHandles(ctx) + if err != nil { + return pmetric.NewMetrics(), err + } + data, err := s.getProcessMetadata(phs) if err != nil { var partialErr scrapererror.PartialScrapeError if !errors.As(err, &partialErr) { @@ -104,6 +111,10 @@ func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) { errs.AddPartial(partialErr.Failed, partialErr) } + aggregateData, err := s.getAggregateProcessMetadata(phs) + if err != nil { + errs.AddPartial(1, fmt.Errorf("error reading aggregate process metadata: %w", err)) + } presentPIDs := make(map[int32]struct{}, len(data)) ctx = context.WithValue(ctx, common.EnvKey, s.config.EnvMap) @@ -164,6 +175,12 @@ func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) { } } + // Record system.processes metric data points. Recording at the end since they + // are to be at the top level and not part of a resource. + now := pcommon.NewTimestampFromTime(time.Now()) + s.scrapeAndAppendSystemProcessesCount(ctx, now, aggregateData) + s.scrapeAndAppendSystemProcessesCreated(ctx, now, aggregateData) + return s.mb.Emit(), errs.Combine() } @@ -171,14 +188,9 @@ func (s *scraper) scrape(ctx context.Context) (pmetric.Metrics, error) { // for all currently running processes. If errors occur obtaining information // for some processes, an error will be returned, but any processes that were // successfully obtained will still be returned. -func (s *scraper) getProcessMetadata() ([]*processMetadata, error) { - ctx := context.WithValue(context.Background(), common.EnvKey, s.config.EnvMap) - handles, err := s.getProcessHandles(ctx) - if err != nil { - return nil, err - } - +func (s *scraper) getProcessMetadata(handles processHandles) ([]*processMetadata, error) { var errs scrapererror.ScrapeErrors + ctx := context.WithValue(context.Background(), common.EnvKey, s.config.EnvMap) if err := s.refreshHandleCounts(); err != nil { errs.Add(err) @@ -436,3 +448,17 @@ func (s *scraper) scrapeAndAppendSignalsPendingMetric(ctx context.Context, now p return nil } + +func (s *scraper) scrapeAndAppendSystemProcessesCreated(ctx context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { + if s.config.MetricsBuilderConfig.Metrics.SystemProcessesCreated.Enabled { + s.mb.RecordSystemProcessesCreatedDataPoint(now, data.processesCreated) + } +} + +func (s *scraper) scrapeAndAppendSystemProcessesCount(ctx context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { + if s.config.MetricsBuilderConfig.Metrics.SystemProcessesCount.Enabled { + for status, count := range data.countByStatus { + s.mb.RecordSystemProcessesCountDataPoint(now, count, status) + } + } +} diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go index 279bd9a6cdd6..1ab18b120005 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go @@ -7,11 +7,14 @@ import ( "context" "errors" "fmt" + "math/rand" + "reflect" "runtime" "testing" "time" "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/load" "github.com/shirou/gopsutil/v3/process" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -25,6 +28,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processesscraper" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata" ) @@ -34,6 +38,12 @@ func skipTestOnUnsupportedOS(t *testing.T) { } } +func skipTestOnUnsupportedOSAggregateMetrics(t *testing.T) { + if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { + t.Skipf("skipping test on %v", runtime.GOOS) + } +} + func enableLinuxOnlyMetrics(ms *metadata.MetricsConfig) { if runtime.GOOS != "linux" { return @@ -366,7 +376,7 @@ func TestScrapeMetrics_GetProcessesError(t *testing.T) { } type processHandlesMock struct { - handles []*processHandleMock + handles []processHandle } func (p *processHandlesMock) Pid(int) int32 { @@ -475,6 +485,11 @@ func (p *processHandleMock) RlimitUsageWithContext(ctx context.Context, b bool) return args.Get(0).([]process.RlimitStat), args.Error(1) } +func (p *processHandleMock) Status() ([]string, error) { + args := p.MethodCalled("Status") + return args.Get(0).([]string), args.Error(1) +} + func initDefaultsHandleMock(t mock.TestingT, handleMock *processHandleMock) { if !handleMock.IsMethodCallable(t, "UsernameWithContext", mock.Anything) { handleMock.On("UsernameWithContext", mock.Anything).Return("username", nil) @@ -527,6 +542,9 @@ func initDefaultsHandleMock(t mock.TestingT, handleMock *processHandleMock) { if !handleMock.IsMethodCallable(t, "ExeWithContext", mock.Anything) { handleMock.On("ExeWithContext", mock.Anything).Return("processname", nil) } + if !handleMock.IsMethodCallable(t, "Status", mock.Anything) { + handleMock.On("Status", mock.Anything).Return([]string{}, nil) + } } func TestScrapeMetrics_Filtered(t *testing.T) { @@ -631,7 +649,7 @@ func TestScrapeMetrics_Filtered(t *testing.T) { err = scraper.start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err, "Failed to initialize process scraper: %v", err) - handles := make([]*processHandleMock, 0, len(test.names)) + handles := make([]processHandle, 0, len(test.names)) for i, name := range test.names { handleMock := &processHandleMock{} handleMock.On("NameWithContext", mock.Anything).Return(name, nil) @@ -689,6 +707,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) { numCtxSwitchesError error numFDsError error rlimitError error + statusError error expectedError string } @@ -865,6 +884,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) { handleMock.On("PageFaultsWithContext", mock.Anything).Return(&process.PageFaultsStat{}, test.pageFaultsError) handleMock.On("NumCtxSwitchesWithContext", mock.Anything).Return(&process.NumCtxSwitchesStat{}, test.numCtxSwitchesError) handleMock.On("NumFDsWithContext", mock.Anything).Return(int32(0), test.numFDsError) + handleMock.On("Status", mock.Anything).Return([]string{}, test.statusError) handleMock.On("RlimitUsageWithContext", mock.Anything, mock.Anything).Return([]process.RlimitStat{ { Resource: process.RLIMIT_SIGPENDING, @@ -873,7 +893,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) { }, test.rlimitError) scraper.getProcessHandles = func(context.Context) (processHandles, error) { - return &processHandlesMock{handles: []*processHandleMock{handleMock}}, nil + return &processHandlesMock{handles: []processHandle{handleMock}}, nil } md, err := scraper.scrape(context.Background()) @@ -1103,7 +1123,7 @@ func TestScrapeMetrics_MuteErrorFlags(t *testing.T) { } scraper.getProcessHandles = func(context.Context) (processHandles, error) { - return &processHandlesMock{handles: []*processHandleMock{handleMock}}, nil + return &processHandlesMock{handles: []processHandle{handleMock}}, nil } md, err := scraper.scrape(context.Background()) @@ -1136,6 +1156,7 @@ func newErroringHandleMock() *processHandleMock { handleMock.On("NumThreadsWithContext", mock.Anything).Return(int32(0), &ProcessReadError{}) handleMock.On("NumCtxSwitchesWithContext", mock.Anything).Return(&process.NumCtxSwitchesStat{}, &ProcessReadError{}) handleMock.On("NumFDsWithContext", mock.Anything).Return(int32(0), &ProcessReadError{}) + handleMock.On("Status", mock.Anything).Return([]string{}, &ProcessReadError{}) return handleMock } @@ -1165,7 +1186,7 @@ func TestScrapeMetrics_DontCheckDisabledMetrics(t *testing.T) { handleMock.On("PpidWithContext", mock.Anything).Return(int32(2), nil) scraper.getProcessHandles = func(context.Context) (processHandles, error) { - return &processHandlesMock{handles: []*processHandleMock{handleMock}}, nil + return &processHandlesMock{handles: []processHandle{handleMock}}, nil } md, err := scraper.scrape(context.Background()) @@ -1234,7 +1255,7 @@ func TestScrapeMetrics_CpuUtilizationWhenCpuTimesIsDisabled(t *testing.T) { initDefaultsHandleMock(t, handleMock) scraper.getProcessHandles = func(context.Context) (processHandles, error) { - return &processHandlesMock{handles: []*processHandleMock{handleMock}}, nil + return &processHandlesMock{handles: []processHandle{handleMock}}, nil } // scrape the first time @@ -1254,5 +1275,439 @@ func TestScrapeMetrics_CpuUtilizationWhenCpuTimesIsDisabled(t *testing.T) { } }) } +} + +func Test_ProcessScrapeAggregateProcessMetrics(t *testing.T) { + t.Parallel() + skipTestOnUnsupportedOSAggregateMetrics(t) + + handleWithStatus := func(status ...string) *processHandleMock { + handleMock := &processHandleMock{} + handleMock.On("Status", mock.Anything).Return(status, nil) + initDefaultsHandleMock(t, handleMock) + return handleMock + } + handleWithStatusErr := func(err error) *processHandleMock { + handleMock := &processHandleMock{} + handleMock.On("Status", mock.Anything).Return([]string{}, err) + initDefaultsHandleMock(t, handleMock) + return handleMock + } + + testCases := []struct { + name string + enableSystemProcessesCreated bool + enableSystemProcessesCount bool + handles []processHandle + handlesWithErrors []processHandle + expectedStatusCounts map[metadata.AttributeStatus]int64 + procsRunning int + procsBlocked int + }{ + { + name: "both enabled", + enableSystemProcessesCreated: true, + enableSystemProcessesCount: true, + handles: []processHandle{ + handleWithStatus(process.Blocked), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Stop), + }, + procsRunning: 3, + procsBlocked: 1, + expectedStatusCounts: map[metadata.AttributeStatus]int64{ + metadata.AttributeStatusBlocked: 1, + metadata.AttributeStatusRunning: 3, + metadata.AttributeStatusStopped: 1, + }, + }, + { + name: "multiple statuses for one process", + enableSystemProcessesCreated: true, + enableSystemProcessesCount: true, + handles: []processHandle{ + handleWithStatus(process.Daemon, process.Running), + handleWithStatus(process.Stop), + }, + procsRunning: 1, + expectedStatusCounts: map[metadata.AttributeStatus]int64{ + metadata.AttributeStatusDaemon: 1, + metadata.AttributeStatusRunning: 1, + metadata.AttributeStatusStopped: 1, + }, + }, + { + name: "both disabled", + enableSystemProcessesCreated: false, + enableSystemProcessesCount: false, + }, + { + name: "only system.processes.created enabled", + enableSystemProcessesCreated: true, + enableSystemProcessesCount: false, + handles: []processHandle{ + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + }, + }, + { + name: "only system.processes.count enabled", + enableSystemProcessesCreated: false, + enableSystemProcessesCount: true, + handles: []processHandle{ + handleWithStatus(process.Daemon), + handleWithStatus(process.Idle), + handleWithStatus(process.Orphan), + }, + expectedStatusCounts: map[metadata.AttributeStatus]int64{ + metadata.AttributeStatusDaemon: 1, + metadata.AttributeStatusIdle: 1, + metadata.AttributeStatusOrphan: 1, + }, + }, + { + name: "handles with errors", + enableSystemProcessesCreated: false, + enableSystemProcessesCount: true, + handles: []processHandle{ + handleWithStatus(process.Daemon), + handleWithStatus(process.Daemon), + }, + handlesWithErrors: []processHandle{ + handleWithStatusErr(errors.New("test")), + }, + procsRunning: 2, + expectedStatusCounts: map[metadata.AttributeStatus]int64{ + metadata.AttributeStatusDaemon: 2, + }, + }, + } + + for _, testCase := range testCases { + testCase := testCase + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + allHandles := testCase.handles + if testCase.handlesWithErrors != nil { + allHandles = append(allHandles, testCase.handlesWithErrors...) + } + + metricsBuilderConfig := metadata.DefaultMetricsBuilderConfig() + + metricsBuilderConfig.Metrics.SystemProcessesCount.Enabled = testCase.enableSystemProcessesCount + metricsBuilderConfig.Metrics.SystemProcessesCreated.Enabled = testCase.enableSystemProcessesCreated + + config := &Config{MetricsBuilderConfig: metricsBuilderConfig} + + scraper, err := newProcessScraper(receivertest.NewNopCreateSettings(), config) + require.NoError(t, err, "Failed to create process scraper: %v", err) + scraper.getProcessHandles = func(context.Context) (processHandles, error) { + return &processHandlesMock{ + handles: allHandles, + }, nil + } + scraper.getMiscStats = func(context.Context) (*load.MiscStat, error) { + return &load.MiscStat{ + ProcsCreated: len(testCase.handles), + ProcsRunning: testCase.procsRunning, + ProcsBlocked: testCase.procsBlocked, + }, nil + } + err = scraper.start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err, "Failed to initialize process scraper: %v", err) + + metrics, err := scraper.scrape(context.Background()) + require.NoError(t, err, "Failed to scrape process metrics: %v", err) + + hasSystemProcessesMetric := testCase.enableSystemProcessesCreated || testCase.enableSystemProcessesCount + + // The structure for the scrape payload is a resource for each handle plus a top-level resource. + expectedResources := len(allHandles) + if hasSystemProcessesMetric { + expectedResources++ + } + require.Equal(t, expectedResources, metrics.ResourceMetrics().Len()) + + if hasSystemProcessesMetric { + // The system metrics are recorded last, so they will be in the final resource in the payload after + // each process resource. + systemProcessesMetrics := metrics.ResourceMetrics().At(len(testCase.handles)).ScopeMetrics().At(0).Metrics() + for i := 0; i < systemProcessesMetrics.Len(); i++ { + metric := systemProcessesMetrics.At(i) + + if metric.Name() == "system.processes.created" { + require.True(t, testCase.enableSystemProcessesCreated, `expected "system.processes.created" metric not to be present`) + assert.True(t, testCase.enableSystemProcessesCreated) + assert.Equal(t, metric.Sum().DataPoints().Len(), 1) + assert.Equal(t, int64(len(testCase.handles)), metric.Sum().DataPoints().At(0).IntValue()) + } + + if metric.Name() == "system.processes.count" { + require.True(t, testCase.enableSystemProcessesCount, `expected "system.processes.count" metric not to be present`) + assert.True(t, testCase.enableSystemProcessesCount) + assertSystemProcessesCountAttributes(t, metric, testCase.expectedStatusCounts) + } + } + } + }) + } +} + +func assertSystemProcessesCountAttributes(t *testing.T, processesCountMetric pmetric.Metric, expectedValues map[metadata.AttributeStatus]int64) { + foundAttributes := map[metadata.AttributeStatus]bool{} + for status := range expectedValues { + foundAttributes[status] = false + } + for i := 0; i < processesCountMetric.Sum().DataPoints().Len(); i++ { + metric := processesCountMetric.Sum().DataPoints().At(i) + statusAttr, ok := metric.Attributes().Get("status") + assert.True(t, ok, `Expected attribute "status" not found`) + status, ok := metadata.MapAttributeStatus[statusAttr.Str()] + assert.True(t, ok, `Found invalid attribute "%s"`, statusAttr.Str()) + expectedValue, ok := expectedValues[status] + if !ok { + // "blocked" and "running" will always be present, so if we're not expecting it, we can + // just ignore it for this test. + if status == metadata.AttributeStatusBlocked || status == metadata.AttributeStatusRunning { + continue + } + assert.Failf(t, `Found unexpected status attribute "%s"`, statusAttr.Str()) + } + foundAttributes[status] = true + assert.Equal( + t, + expectedValue, + metric.IntValue(), + `expected status "%s" to have value %d, got %d`, + status, + expectedValue, + metric.IntValue(), + ) + } + for status, found := range foundAttributes { + assert.True(t, found, `Expected status attribute "%s" not found`, status) + } +} + +func Test_SystemProcessesCompatibilityTest(t *testing.T) { + t.Parallel() + skipTestOnUnsupportedOSAggregateMetrics(t) + + handleWithStatus := func(status ...string) *processHandleMock { + handleMock := &processHandleMock{} + handleMock.On("Status", mock.Anything).Return(status, nil) + initDefaultsHandleMock(t, handleMock) + return handleMock + } + + testCase := []struct { + name string + handles []processHandle + }{ + { + name: "single process", + handles: []processHandle{ + handleWithStatus(process.Running), + }, + }, + { + name: "multiple processes", + handles: []processHandle{ + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + handleWithStatus(process.Running), + }, + }, + { + name: "every status", + handles: (func() []processHandle { + handles := make([]processHandle, 0, len(charToState)) + for char := range charToState { + handles = append(handles, handleWithStatus(char)) + } + return handles + })(), + }, + } + for _, testCase := range testCase { + testCase := testCase + t.Run(testCase.name, func(t *testing.T) { + t.Parallel() + + assertProcessAndProcessesScrapesEqual(t, testCase.handles) + }) + } +} +func Fuzz_SystemProcessesCompatibilityTest(f *testing.F) { + validStates := mapKeys(charToState) + f.Add(1) + f.Add(100) + f.Add(1000) + f.Fuzz(func(t *testing.T, fuzzIn int) { + numProcs := fuzzIn + if numProcs < 0 { + numProcs = 0 + } + if numProcs > 50000 { + numProcs = 50000 + } + handles := make([]processHandle, numProcs) + for i := 0; i < numProcs; i++ { + handleMock := &processHandleMock{} + initDefaultsHandleMock(t, handleMock) + randomState := validStates[rand.Intn(len(validStates))] + handleMock.On("Status", mock.Anything).Return([]string{randomState}, nil) + handles[i] = handleMock + } + assertProcessAndProcessesScrapesEqual(t, handles) + }) +} + +type systemProcessResults struct { + created int64 + countByState map[string]int64 +} + +func systemProcessResultsFromProcessesScrape(scrapeMetrics pmetric.Metrics) systemProcessResults { + result := systemProcessResults{} + metrics := scrapeMetrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + if metric.Name() == "system.processes.created" { + result.created = metric.Sum().DataPoints().At(0).IntValue() + } + if metric.Name() == "system.processes.count" { + result.countByState = map[string]int64{} + points := metric.Sum().DataPoints() + for k := 0; k < points.Len(); k++ { + dp := points.At(k) + status, ok := dp.Attributes().Get("status") + if !ok { + continue + } + result.countByState[status.Str()] = dp.IntValue() + } + } + } + return result +} + +func systemProcessResultsFromProcessScrape(scrapeMetrics pmetric.Metrics) systemProcessResults { + result := systemProcessResults{} + // The system metrics are recorded last in the process scraper, so they will be in the final resource + // in the payload after each process resource. + resourceCount := scrapeMetrics.ResourceMetrics().Len() + metrics := scrapeMetrics.ResourceMetrics().At(resourceCount - 1).ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + metric := metrics.At(i) + if metric.Name() == "system.processes.created" { + result.created = metric.Sum().DataPoints().At(0).IntValue() + } + if metric.Name() == "system.processes.count" { + result.countByState = map[string]int64{} + points := metric.Sum().DataPoints() + for k := 0; k < points.Len(); k++ { + dp := points.At(k) + status, ok := dp.Attributes().Get("status") + if !ok { + continue + } + result.countByState[status.Str()] = dp.IntValue() + } + } + } + return result +} + +func assertProcessAndProcessesScrapesEqual(t *testing.T, handles []processHandle) { + t.Helper() + + ctx := context.Background() + + processesScraperCfg := (&processesscraper.Factory{}).CreateDefaultConfig().(*processesscraper.Config) + processesScraper := processesscraper.NewProcessesScraper( + context.Background(), + receivertest.NewNopCreateSettings(), + processesScraperCfg, + ) + processScraper, err := newProcessScraper( + receivertest.NewNopCreateSettings(), + &Config{ + MetricsBuilderConfig: (func() metadata.MetricsBuilderConfig { + mbConfig := metadata.DefaultMetricsBuilderConfig() + mbConfig.Metrics.SystemProcessesCount.Enabled = true + mbConfig.Metrics.SystemProcessesCreated.Enabled = true + return mbConfig + })(), + }, + ) + require.Nil(t, err) + + err = processesScraper.Start(ctx, componenttest.NewNopHost()) + require.Nil(t, err) + err = processScraper.start(ctx, componenttest.NewNopHost()) + require.Nil(t, err) + + processesScraper.GetProcesses = func() ([]processesscraper.Proc, error) { + procHandles := make([]processesscraper.Proc, len(handles)) + for i, handle := range handles { + procHandles[i] = handle.(processesscraper.Proc) + } + return procHandles, nil + } + processesScraper.GetMiscStats = miscStatFunc(handles) + processScraper.getProcessHandles = func(_ context.Context) (processHandles, error) { + return &processHandlesMock{handles: handles}, nil + } + processScraper.getMiscStats = miscStatFunc(handles) + + processesMetrics, err := processesScraper.Scrape(ctx) + require.Nil(t, err) + processMetrics, err := processScraper.scrape(ctx) + require.Nil(t, err) + + processesScrapeResults := systemProcessResultsFromProcessesScrape(processesMetrics) + processScrapeResults := systemProcessResultsFromProcessScrape(processMetrics) + + assert.Equal(t, processesScrapeResults.created, processScrapeResults.created) + assert.True(t, reflect.DeepEqual(processesScrapeResults.countByState, processScrapeResults.countByState)) +} + +func miscStatFunc(handles []processHandle) func(context.Context) (*load.MiscStat, error) { + return func(context.Context) (*load.MiscStat, error) { + stat := &load.MiscStat{ + ProcsCreated: len(handles), + } + for _, handle := range handles { + status, err := handle.Status() + if err != nil { + continue + } + for _, s := range status { + if s == process.Running { + stat.ProcsRunning++ + } + if s == process.Blocked { + stat.ProcsBlocked++ + } + } + } + return stat, nil + } +} + +func mapKeys[K comparable, V any](m map[K]V) []K { + keys := make([]K, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys } From 4249401e0fbc67ec3b72bc141a824cfc7a26f2e6 Mon Sep 17 00:00:00 2001 From: braydonk Date: Tue, 30 Jan 2024 20:40:41 +0000 Subject: [PATCH 02/10] Add changelog entry --- .chloggen/deprecate_processes_scraper.yaml | 27 +++++++++++++++++++ .../processscraper/process_scraper_test.go | 1 - 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100755 .chloggen/deprecate_processes_scraper.yaml diff --git a/.chloggen/deprecate_processes_scraper.yaml b/.chloggen/deprecate_processes_scraper.yaml new file mode 100755 index 000000000000..bb3f843e3bde --- /dev/null +++ b/.chloggen/deprecate_processes_scraper.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: hostmetricsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecating the processesscraper, and adding the same functionality directly into the processscraper. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30895] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go index 1ab18b120005..d146322afa95 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go @@ -1540,7 +1540,6 @@ func Test_SystemProcessesCompatibilityTest(t *testing.T) { testCase := testCase t.Run(testCase.name, func(t *testing.T) { t.Parallel() - assertProcessAndProcessesScrapesEqual(t, testCase.handles) }) } From 8c087fc3c85fc4d71a2432a930e33ffc2625c12e Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 1 Feb 2024 13:43:23 +0000 Subject: [PATCH 03/10] Repurpose PR to not be a direct deprecation Since this receiver is in beta, I should not have directly been making a deprecation in this PR. Instead this PR will be focused on adding the functionality to the `processscraper` and provides a warning suggestion to the `processesscraper` instead of directly declaring it deprecated. --- ....yaml => system_processes_metrics_to_process_scraper.yaml} | 4 ++-- .../internal/scraper/processesscraper/factory.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename .chloggen/{deprecate_processes_scraper.yaml => system_processes_metrics_to_process_scraper.yaml} (88%) diff --git a/.chloggen/deprecate_processes_scraper.yaml b/.chloggen/system_processes_metrics_to_process_scraper.yaml similarity index 88% rename from .chloggen/deprecate_processes_scraper.yaml rename to .chloggen/system_processes_metrics_to_process_scraper.yaml index bb3f843e3bde..b0552b4d34e9 100755 --- a/.chloggen/deprecate_processes_scraper.yaml +++ b/.chloggen/system_processes_metrics_to_process_scraper.yaml @@ -1,13 +1,13 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: deprecation +change_type: enhancement # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) component: hostmetricsreceiver # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Deprecating the processesscraper, and adding the same functionality directly into the processscraper. +note: Adds the system.processes.* metrics to the process scraper. Also adds a deprecation warning to the processesscraper. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [30895] diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go index fce24f3d2691..d9f03da99171 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go @@ -37,7 +37,7 @@ func (f *Factory) CreateMetricsScraper( settings receiver.CreateSettings, config internal.Config, ) (scraperhelper.Scraper, error) { - settings.Logger.Warn(`processes scraping is deprecated, system.processes.created and system.processes.count metrics have been moved to the process scraper. + settings.Logger.Warn(`processes scraping will soon be deprecated, system.processes.created and system.processes.count metrics have been moved to the process scraper. To enable them, apply the following config: scrapers: From 942eb69c84f7e15e11e006c1240e295e062e8e6b Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 1 Feb 2024 13:49:26 +0000 Subject: [PATCH 04/10] run make goporto --- .../internal/scraper/processscraper/aggregate_process.go | 2 +- .../scraper/processscraper/aggregate_process_fallback.go | 2 +- .../internal/scraper/processscraper/aggregate_process_unix.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go index 86007f73230d..9c8af6b2d519 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package processscraper +package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper" import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata" diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go index 888cfacbb2f4..8f4766cb845d 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go @@ -4,7 +4,7 @@ //go:build !linux && !darwin && !freebsd && !openbsd // +build !linux,!darwin,!freebsd,!openbsd -package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processesscraper" +package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper" const enableProcessesCount = false const enableProcessesCreated = false diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go index 87ee5c0c49d7..ce3d4f09b9a5 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go @@ -4,7 +4,7 @@ //go:build linux || darwin || freebsd || openbsd // +build linux darwin freebsd openbsd -package processscraper +package processscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper" import ( "context" From f5f69e7497039d6c9405c722a58a8b33d44be345 Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 1 Feb 2024 16:54:19 +0000 Subject: [PATCH 05/10] Remove unnecessary enabled metric guard The guard in scrapeAndAppendSystemProcessesCreated metric was already covered by the record data point function. --- .../internal/scraper/processscraper/process_scraper.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go index a661adfdbbb1..dfc8b060fd22 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go @@ -450,9 +450,7 @@ func (s *scraper) scrapeAndAppendSignalsPendingMetric(ctx context.Context, now p } func (s *scraper) scrapeAndAppendSystemProcessesCreated(ctx context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { - if s.config.MetricsBuilderConfig.Metrics.SystemProcessesCreated.Enabled { - s.mb.RecordSystemProcessesCreatedDataPoint(now, data.processesCreated) - } + s.mb.RecordSystemProcessesCreatedDataPoint(now, data.processesCreated) } func (s *scraper) scrapeAndAppendSystemProcessesCount(ctx context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { From f2139d28c7f5c236ee95fd1ba5ce1f4faecfa960 Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 1 Feb 2024 19:15:15 +0000 Subject: [PATCH 06/10] Fix CI Issues * Add system.processes.* metrics to allowed metrics in duplicate metrics validation test * Change ctx arg to _ in certain functions * Change type name to Scraper in processesscraper fallback --- cmd/mdatagen/validate_test.go | 2 ++ .../scraper/processesscraper/processes_scraper_fallback.go | 2 +- .../processscraper/internal/metadata/generated_config.go | 4 ++-- .../internal/scraper/processscraper/process_scraper.go | 4 ++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/mdatagen/validate_test.go b/cmd/mdatagen/validate_test.go index 12871a4ab8c5..9af1d4f65336 100644 --- a/cmd/mdatagen/validate_test.go +++ b/cmd/mdatagen/validate_test.go @@ -111,6 +111,8 @@ func TestValidateMetricDuplicates(t *testing.T) { "container.cpu.utilization": {"docker_stats", "kubeletstats"}, "container.memory.rss": {"docker_stats", "kubeletstats"}, "container.uptime": {"docker_stats", "kubeletstats"}, + "system.processes.created": {"hostmetricsreceiver/process", "hostmetricsreceiver/processes"}, + "system.processes.count": {"hostmetricsreceiver/process", "hostmetricsreceiver/processes"}, } allMetrics := map[string][]string{} err := filepath.Walk("../../receiver", func(path string, info fs.FileInfo, err error) error { diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go index 467b1433f320..664930c9e422 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go @@ -8,6 +8,6 @@ package processesscraper // import "github.com/open-telemetry/opentelemetry-coll const enableProcessesCount = false const enableProcessesCreated = false -func (s *scraper) getProcessesMetadata() (processesMetadata, error) { +func (s *Scraper) getProcessesMetadata() (processesMetadata, error) { return processesMetadata{}, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go index e9a7ea20dc6a..eaa9d889308e 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata/generated_config.go @@ -15,7 +15,7 @@ func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { if parser == nil { return nil } - err := parser.Unmarshal(ms, confmap.WithErrorUnused()) + err := parser.Unmarshal(ms) if err != nil { return err } @@ -103,7 +103,7 @@ func (rac *ResourceAttributeConfig) Unmarshal(parser *confmap.Conf) error { if parser == nil { return nil } - err := parser.Unmarshal(rac, confmap.WithErrorUnused()) + err := parser.Unmarshal(rac) if err != nil { return err } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go index dfc8b060fd22..8061cebe0e26 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go @@ -449,11 +449,11 @@ func (s *scraper) scrapeAndAppendSignalsPendingMetric(ctx context.Context, now p return nil } -func (s *scraper) scrapeAndAppendSystemProcessesCreated(ctx context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { +func (s *scraper) scrapeAndAppendSystemProcessesCreated(_ context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { s.mb.RecordSystemProcessesCreatedDataPoint(now, data.processesCreated) } -func (s *scraper) scrapeAndAppendSystemProcessesCount(ctx context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { +func (s *scraper) scrapeAndAppendSystemProcessesCount(_ context.Context, now pcommon.Timestamp, data aggregateProcessMetadata) { if s.config.MetricsBuilderConfig.Metrics.SystemProcessesCount.Enabled { for status, count := range data.countByStatus { s.mb.RecordSystemProcessesCountDataPoint(now, count, status) From 04c038ea46cd8b2bd9f0216721cc53570c5ccda5 Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 1 Feb 2024 19:46:30 +0000 Subject: [PATCH 07/10] Run GCI and fix cross-compile Fixed a misname in the fallback build for aggregate process data, run gci on the aggregate_process files. --- .../scraper/processscraper/aggregate_process_fallback.go | 4 ++-- .../internal/scraper/processscraper/aggregate_process_unix.go | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go index 8f4766cb845d..0ce2c8e39d8b 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go @@ -9,6 +9,6 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec const enableProcessesCount = false const enableProcessesCreated = false -func (s *scraper) getProcessesMetadata() (processesMetadata, error) { - return processesMetadata{}, nil +func (s *scraper) getProcessesMetadata() (aggregateProcessMetadata, error) { + return aggregateProcessMetadata{}, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go index ce3d4f09b9a5..19fa5314dc03 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_unix.go @@ -9,9 +9,10 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec import ( "context" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata" "github.com/shirou/gopsutil/v3/common" "github.com/shirou/gopsutil/v3/process" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata" ) func (s *scraper) getAggregateProcessMetadata(handles processHandles) (aggregateProcessMetadata, error) { From db90961fdd3d2cfc226197592e446eece0b5e1d5 Mon Sep 17 00:00:00 2001 From: braydonk Date: Thu, 1 Feb 2024 19:55:17 +0000 Subject: [PATCH 08/10] Fix another spelling error in aggregate process fallback --- .../scraper/processscraper/aggregate_process_fallback.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go index 0ce2c8e39d8b..c1cdc7ae07ec 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go @@ -9,6 +9,6 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec const enableProcessesCount = false const enableProcessesCreated = false -func (s *scraper) getProcessesMetadata() (aggregateProcessMetadata, error) { +func (s *scraper) getAggregateProcessesMetadata() (aggregateProcessMetadata, error) { return aggregateProcessMetadata{}, nil } From c1584c00b31053e4a753b4f4dba2b4383fbc2ebe Mon Sep 17 00:00:00 2001 From: braydonk Date: Sat, 3 Feb 2024 15:24:13 +0000 Subject: [PATCH 09/10] I will learn to spell eventually --- .../scraper/processscraper/aggregate_process_fallback.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go index c1cdc7ae07ec..3b325c4d4f7b 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go @@ -9,6 +9,6 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec const enableProcessesCount = false const enableProcessesCreated = false -func (s *scraper) getAggregateProcessesMetadata() (aggregateProcessMetadata, error) { +func (s *scraper) getAggregateProcessMetadata() (aggregateProcessMetadata, error) { return aggregateProcessMetadata{}, nil } From 83a3651ad7af121f9bdd114bc46fac7b64ee58d9 Mon Sep 17 00:00:00 2001 From: braydonk Date: Sat, 10 Feb 2024 15:41:35 +0000 Subject: [PATCH 10/10] fix windows build --- .../scraper/processscraper/aggregate_process_fallback.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go index 3b325c4d4f7b..dce6d7fe6ad7 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/aggregate_process_fallback.go @@ -9,6 +9,6 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec const enableProcessesCount = false const enableProcessesCreated = false -func (s *scraper) getAggregateProcessMetadata() (aggregateProcessMetadata, error) { +func (s *scraper) getAggregateProcessMetadata(_ processHandles) (aggregateProcessMetadata, error) { return aggregateProcessMetadata{}, nil }