From 5414c556942ea5b59e68aabeed8b10cfdfb65e31 Mon Sep 17 00:00:00 2001 From: James Bebbington Date: Sun, 1 Nov 2020 11:44:26 +1100 Subject: [PATCH] Update host metrics receiver to use receiverhelper (#1949) --- receiver/hostmetricsreceiver/config.go | 10 +- receiver/hostmetricsreceiver/config_test.go | 11 +- receiver/hostmetricsreceiver/factory.go | 82 +++++- receiver/hostmetricsreceiver/factory_test.go | 7 +- .../hostmetrics_receiver.go | 250 ------------------ .../hostmetrics_receiver_test.go | 210 ++++++++------- .../hostmetricsreceiver/internal/scraper.go | 40 +-- .../scraper/cpuscraper/cpu_scraper.go | 16 +- .../scraper/cpuscraper/cpu_scraper_test.go | 13 +- .../internal/scraper/cpuscraper/factory.go | 14 +- .../diskscraper/disk_scraper_others.go | 19 +- .../diskscraper/disk_scraper_others_test.go | 13 +- .../scraper/diskscraper/disk_scraper_test.go | 5 +- .../diskscraper/disk_scraper_windows.go | 20 +- .../diskscraper/disk_scraper_windows_test.go | 12 +- .../internal/scraper/diskscraper/factory.go | 15 +- .../scraper/filesystemscraper/factory.go | 11 +- .../filesystemscraper/filesystem_scraper.go | 42 +-- .../filesystem_scraper_test.go | 16 +- .../internal/scraper/loadscraper/factory.go | 15 +- .../scraper/loadscraper/load_scraper.go | 11 +- .../scraper/loadscraper/load_scraper_test.go | 12 +- .../internal/scraper/memoryscraper/factory.go | 10 +- .../scraper/memoryscraper/memory_scraper.go | 21 +- .../memoryscraper/memory_scraper_test.go | 16 +- .../scraper/networkscraper/factory.go | 15 +- .../scraper/networkscraper/network_scraper.go | 27 +- .../networkscraper/network_scraper_test.go | 28 +- .../scraper/obsreportscraper/common.go | 19 -- .../scraper/obsreportscraper/common_test.go | 39 --- .../obsreportresourcescraper.go | 56 ---- .../obsreportresourcescraper_test.go | 76 ------ .../obsreportscraper/obsreportscraper.go | 56 ---- .../obsreportscraper/obsreportscraper_test.go | 76 ------ .../scraper/processesscraper/factory.go | 14 +- .../processesscraper/processes_scraper.go | 11 +- .../processes_scraper_fallback.go | 2 + .../processes_scraper_test.go | 13 +- .../processes_scraper_unix.go | 7 +- .../scraper/processscraper/factory.go | 19 +- .../scraper/processscraper/factory_test.go | 4 +- .../scraper/processscraper/process_scraper.go | 47 ++-- .../processscraper/process_scraper_test.go | 73 +++-- .../internal/scraper/swapscraper/factory.go | 14 +- .../swapscraper/swap_scraper_others.go | 27 +- .../swapscraper/swap_scraper_others_test.go | 23 +- .../scraper/swapscraper/swap_scraper_test.go | 5 +- .../swapscraper/swap_scraper_windows.go | 29 +- .../swapscraper/swap_scraper_windows_test.go | 37 ++- .../hostmetricsreceiver/internal/utils.go | 11 - receiver/receiverhelper/errors.go | 8 +- 51 files changed, 631 insertions(+), 996 deletions(-) delete mode 100644 receiver/hostmetricsreceiver/hostmetrics_receiver.go delete mode 100644 receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common.go delete mode 100644 receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common_test.go delete mode 100644 receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper.go delete mode 100644 receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper_test.go delete mode 100644 receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper.go delete mode 100644 receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper_test.go diff --git a/receiver/hostmetricsreceiver/config.go b/receiver/hostmetricsreceiver/config.go index b1cc11f665b..5a271b4341e 100644 --- a/receiver/hostmetricsreceiver/config.go +++ b/receiver/hostmetricsreceiver/config.go @@ -15,16 +15,12 @@ package hostmetricsreceiver import ( - "time" - - "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // Config defines configuration for HostMetrics receiver. type Config struct { - configmodels.ReceiverSettings `mapstructure:",squash"` - - CollectionInterval time.Duration `mapstructure:"collection_interval"` - Scrapers map[string]internal.Config `mapstructure:"-"` + receiverhelper.ScraperControllerSettings `mapstructure:",squash"` + Scrapers map[string]internal.Config `mapstructure:"-"` } diff --git a/receiver/hostmetricsreceiver/config_test.go b/receiver/hostmetricsreceiver/config_test.go index fe5b4f3aaf6..e2fd74047f4 100644 --- a/receiver/hostmetricsreceiver/config_test.go +++ b/receiver/hostmetricsreceiver/config_test.go @@ -36,6 +36,7 @@ import ( "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/processesscraper" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/processscraper" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/swapscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) func TestLoadConfig(t *testing.T) { @@ -61,11 +62,13 @@ func TestLoadConfig(t *testing.T) { r1 := cfg.Receivers["hostmetrics/customname"].(*Config) expectedConfig := &Config{ - ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: typeStr, - NameVal: "hostmetrics/customname", + ScraperControllerSettings: receiverhelper.ScraperControllerSettings{ + ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: "hostmetrics/customname", + }, + CollectionInterval: 30 * time.Second, }, - CollectionInterval: 30 * time.Second, Scrapers: map[string]internal.Config{ cpuscraper.TypeStr: &cpuscraper.Config{}, diskscraper.TypeStr: &diskscraper.Config{}, diff --git a/receiver/hostmetricsreceiver/factory.go b/receiver/hostmetricsreceiver/factory.go index c68bee1253e..c4ff664416c 100644 --- a/receiver/hostmetricsreceiver/factory.go +++ b/receiver/hostmetricsreceiver/factory.go @@ -18,9 +18,9 @@ import ( "context" "errors" "fmt" - "time" "github.com/spf13/viper" + "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" @@ -88,10 +88,6 @@ func customUnmarshaler(componentViperSection *viper.Viper, intoCfg interface{}) return fmt.Errorf("config type not hostmetrics.Config") } - if cfg.CollectionInterval <= 0 { - return fmt.Errorf("collection_interval must be a positive duration") - } - // dynamically load the individual collector configs based on the key name cfg.Scrapers = map[string]internal.Config{} @@ -140,13 +136,7 @@ func getScraperFactory(key string) (internal.BaseFactory, bool) { // createDefaultConfig creates the default configuration for receiver. func createDefaultConfig() configmodels.Receiver { - return &Config{ - ReceiverSettings: configmodels.ReceiverSettings{ - TypeVal: typeStr, - NameVal: typeStr, - }, - CollectionInterval: time.Minute, - } + return &Config{ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(typeStr)} } // createMetricsReceiver creates a metrics receiver based on provided config. @@ -158,10 +148,74 @@ func createMetricsReceiver( ) (component.MetricsReceiver, error) { oCfg := cfg.(*Config) - hmr, err := newHostMetricsReceiver(ctx, params.Logger, oCfg, scraperFactories, resourceScraperFactories, consumer) + addScraperOptions, err := createAddScraperOptions(ctx, params.Logger, oCfg, scraperFactories, resourceScraperFactories) if err != nil { return nil, err } - return hmr, nil + return receiverhelper.NewScraperControllerReceiver( + &oCfg.ScraperControllerSettings, + consumer, + addScraperOptions..., + ) +} + +func createAddScraperOptions( + ctx context.Context, + logger *zap.Logger, + config *Config, + factories map[string]internal.ScraperFactory, + resourceFactories map[string]internal.ResourceScraperFactory, +) ([]receiverhelper.ScraperControllerOption, error) { + scraperControllerOptions := make([]receiverhelper.ScraperControllerOption, 0, len(config.Scrapers)) + + for key, cfg := range config.Scrapers { + hostMetricsScraper, ok, err := createHostMetricsScraper(ctx, logger, key, cfg, factories) + if err != nil { + return nil, fmt.Errorf("failed to create scraper for key %q: %w", key, err) + } + + if ok { + scraperControllerOptions = append(scraperControllerOptions, receiverhelper.AddMetricsScraper(hostMetricsScraper)) + continue + } + + resourceMetricsScraper, ok, err := createResourceMetricsScraper(ctx, logger, key, cfg, resourceFactories) + if err != nil { + return nil, fmt.Errorf("failed to create resource scraper for key %q: %w", key, err) + } + + if ok { + scraperControllerOptions = append(scraperControllerOptions, receiverhelper.AddResourceMetricsScraper(resourceMetricsScraper)) + continue + } + + return nil, fmt.Errorf("host metrics scraper factory not found for key: %q", key) + } + + return scraperControllerOptions, nil +} + +func createHostMetricsScraper(ctx context.Context, logger *zap.Logger, key string, cfg internal.Config, factories map[string]internal.ScraperFactory) (scraper receiverhelper.MetricsScraper, ok bool, err error) { + factory := factories[key] + if factory == nil { + ok = false + return + } + + ok = true + scraper, err = factory.CreateMetricsScraper(ctx, logger, cfg) + return +} + +func createResourceMetricsScraper(ctx context.Context, logger *zap.Logger, key string, cfg internal.Config, factories map[string]internal.ResourceScraperFactory) (scraper receiverhelper.ResourceMetricsScraper, ok bool, err error) { + factory := factories[key] + if factory == nil { + ok = false + return + } + + ok = true + scraper, err = factory.CreateResourceMetricsScraper(ctx, logger, cfg) + return } diff --git a/receiver/hostmetricsreceiver/factory_test.go b/receiver/hostmetricsreceiver/factory_test.go index 01c68000bb2..b2cafbc171e 100644 --- a/receiver/hostmetricsreceiver/factory_test.go +++ b/receiver/hostmetricsreceiver/factory_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcheck" "go.opentelemetry.io/collector/config/configerror" + "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) @@ -41,12 +42,12 @@ func TestCreateReceiver(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() - tReceiver, err := factory.CreateTracesReceiver(context.Background(), creationParams, cfg, nil) + tReceiver, err := factory.CreateTracesReceiver(context.Background(), creationParams, cfg, &exportertest.SinkTraceExporter{}) assert.Equal(t, err, configerror.ErrDataTypeIsNotSupported) assert.Nil(t, tReceiver) - mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, nil) + mReceiver, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, &exportertest.SinkMetricsExporter{}) assert.NoError(t, err) assert.NotNil(t, mReceiver) @@ -58,6 +59,6 @@ func TestCreateReceiver_ScraperKeyConfigError(t *testing.T) { factory := NewFactory() cfg := &Config{Scrapers: map[string]internal.Config{errorKey: &mockConfig{}}} - _, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, nil) + _, err := factory.CreateMetricsReceiver(context.Background(), creationParams, cfg, &exportertest.SinkMetricsExporter{}) assert.EqualError(t, err, fmt.Sprintf("host metrics scraper factory not found for key: %q", errorKey)) } diff --git a/receiver/hostmetricsreceiver/hostmetrics_receiver.go b/receiver/hostmetricsreceiver/hostmetrics_receiver.go deleted file mode 100644 index 3b06e73c067..00000000000 --- a/receiver/hostmetricsreceiver/hostmetrics_receiver.go +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package hostmetricsreceiver - -import ( - "context" - "fmt" - "time" - - "go.opencensus.io/trace" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" -) - -// receiver is the type that scrapes various host metrics. -type receiver struct { - config *Config - - hostMetricScrapers []internal.Scraper - resourceMetricScrapers []internal.ResourceScraper - - consumer consumer.MetricsConsumer - done chan struct{} -} - -// newHostMetricsReceiver creates a host metrics scraper. -func newHostMetricsReceiver( - ctx context.Context, - logger *zap.Logger, - config *Config, - factories map[string]internal.ScraperFactory, - resourceFactories map[string]internal.ResourceScraperFactory, - consumer consumer.MetricsConsumer, -) (*receiver, error) { - - hostMetricScrapers := make([]internal.Scraper, 0) - resourceMetricScrapers := make([]internal.ResourceScraper, 0) - - for key, cfg := range config.Scrapers { - hostMetricsScraper, ok, err := createHostMetricsScraper(ctx, logger, key, cfg, factories) - if err != nil { - return nil, fmt.Errorf("failed to create scraper for key %q: %w", key, err) - } - - if ok { - hostMetricScrapers = append(hostMetricScrapers, hostMetricsScraper) - continue - } - - resourceMetricsScraper, ok, err := createResourceMetricsScraper(ctx, logger, key, cfg, resourceFactories) - if err != nil { - return nil, fmt.Errorf("failed to create resource scraper for key %q: %w", key, err) - } - - if ok { - resourceMetricScrapers = append(resourceMetricScrapers, resourceMetricsScraper) - continue - } - - return nil, fmt.Errorf("host metrics scraper factory not found for key: %q", key) - } - - hmr := &receiver{ - config: config, - hostMetricScrapers: hostMetricScrapers, - resourceMetricScrapers: resourceMetricScrapers, - consumer: consumer, - } - - return hmr, nil -} - -func createHostMetricsScraper(ctx context.Context, logger *zap.Logger, key string, cfg internal.Config, factories map[string]internal.ScraperFactory) (scraper internal.Scraper, ok bool, err error) { - factory := factories[key] - if factory == nil { - ok = false - return - } - - ok = true - scraper, err = factory.CreateMetricsScraper(ctx, logger, cfg) - return -} - -func createResourceMetricsScraper(ctx context.Context, logger *zap.Logger, key string, cfg internal.Config, factories map[string]internal.ResourceScraperFactory) (scraper internal.ResourceScraper, ok bool, err error) { - factory := factories[key] - if factory == nil { - ok = false - return - } - - ok = true - scraper, err = factory.CreateMetricsScraper(ctx, logger, cfg) - return -} - -// Start initializes the underlying scrapers and begins scraping -// host metrics based on the OS platform. -func (hmr *receiver) Start(ctx context.Context, host component.Host) error { - hmr.done = make(chan struct{}) - - go func() { - hmr.initializeScrapers(ctx, host) - hmr.startScrapers() - }() - - return nil -} - -// Shutdown terminates all tickers and stops the underlying scrapers. -func (hmr *receiver) Shutdown(ctx context.Context) error { - close(hmr.done) - return hmr.closeScrapers(ctx) -} - -func (hmr *receiver) initializeScrapers(ctx context.Context, host component.Host) { - for _, scraper := range hmr.allScrapers() { - err := scraper.Initialize(ctx) - if err != nil { - host.ReportFatalError(err) - return - } - } -} - -func (hmr *receiver) startScrapers() { - go func() { - ticker := time.NewTicker(hmr.config.CollectionInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - hmr.scrapeMetrics(context.Background()) - case <-hmr.done: - return - } - } - }() -} - -func (hmr *receiver) scrapeMetrics(ctx context.Context) { - ctx, span := trace.StartSpan(ctx, "hostmetricsreceiver.ScrapeMetrics") - defer span.End() - - var errors []error - metricData := pdata.NewMetrics() - - if err := hmr.scrapeAndAppendHostMetrics(ctx, metricData); err != nil { - errors = append(errors, err) - } - - if err := hmr.scrapeAndAppendResourceMetrics(ctx, metricData); err != nil { - errors = append(errors, err) - } - - if len(errors) > 0 { - span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Error(s) when scraping metrics: %v", componenterror.CombineErrors(errors))}) - } - - if err := hmr.consumer.ConsumeMetrics(ctx, metricData); err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeDataLoss, Message: fmt.Sprintf("Unable to process metrics: %v", err)}) - return - } -} - -func (hmr *receiver) scrapeAndAppendHostMetrics(ctx context.Context, metricData pdata.Metrics) error { - if len(hmr.hostMetricScrapers) == 0 { - return nil - } - - metrics := internal.InitializeMetricSlice(metricData) - - var errors []error - for _, scraper := range hmr.hostMetricScrapers { - scraperMetrics, err := scraper.ScrapeMetrics(ctx) - if err != nil { - errors = append(errors, err) - } - - scraperMetrics.MoveAndAppendTo(metrics) - } - - return componenterror.CombineErrors(errors) -} - -func (hmr *receiver) scrapeAndAppendResourceMetrics(ctx context.Context, metricData pdata.Metrics) error { - if len(hmr.resourceMetricScrapers) == 0 { - return nil - } - - rm := metricData.ResourceMetrics() - - var errors []error - for _, scraper := range hmr.resourceMetricScrapers { - scraperResourceMetrics, err := scraper.ScrapeMetrics(ctx) - if err != nil { - errors = append(errors, err) - } - - scraperResourceMetrics.MoveAndAppendTo(rm) - } - - return componenterror.CombineErrors(errors) -} - -func (hmr *receiver) closeScrapers(ctx context.Context) error { - var errs []error - for _, scraper := range hmr.allScrapers() { - err := scraper.Close(ctx) - if err != nil { - errs = append(errs, err) - } - } - - if len(errs) > 0 { - return componenterror.CombineErrors(errs) - } - - return nil -} - -func (hmr *receiver) allScrapers() []internal.BaseScraper { - allScrapers := make([]internal.BaseScraper, len(hmr.hostMetricScrapers)+len(hmr.resourceMetricScrapers)) - for i, hostMetricScraper := range hmr.hostMetricScrapers { - allScrapers[i] = hostMetricScraper - } - startIdx := len(hmr.hostMetricScrapers) - for i, resourceMetricScraper := range hmr.resourceMetricScrapers { - allScrapers[startIdx+i] = resourceMetricScraper - } - return allScrapers -} diff --git a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go index 8786130ef5d..8e389a19476 100644 --- a/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go +++ b/receiver/hostmetricsreceiver/hostmetrics_receiver_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" @@ -39,6 +40,7 @@ import ( "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/processesscraper" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/processscraper" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/swapscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) var standardMetrics = []string{ @@ -93,10 +95,15 @@ var resourceFactories = map[string]internal.ResourceScraperFactory{ } func TestGatherMetrics_EndToEnd(t *testing.T) { + scraperFactories = factories + resourceScraperFactories = resourceFactories + sink := new(consumertest.MetricsSink) config := &Config{ - CollectionInterval: 100 * time.Millisecond, + ScraperControllerSettings: receiverhelper.ScraperControllerSettings{ + CollectionInterval: 100 * time.Millisecond, + }, Scrapers: map[string]internal.Config{ cpuscraper.TypeStr: &cpuscraper.Config{}, diskscraper.TypeStr: &diskscraper.Config{}, @@ -113,7 +120,7 @@ func TestGatherMetrics_EndToEnd(t *testing.T) { config.Scrapers[processscraper.TypeStr] = &processscraper.Config{} } - receiver, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, factories, resourceFactories, sink) + receiver, err := NewFactory().CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, config, sink) require.NoError(t, err, "Failed to create metrics receiver: %v", err) @@ -133,48 +140,43 @@ func TestGatherMetrics_EndToEnd(t *testing.T) { return false } - assertIncludesStandardMetrics(t, got[0]) - assertIncludesResourceMetrics(t, got[0]) + assertIncludesExpectedMetrics(t, got[0]) return true }, waitFor, tick, "No metrics were collected after %v", waitFor) } -func assertIncludesStandardMetrics(t *testing.T, got pdata.Metrics) { - // get the first ResourceMetrics object +func assertIncludesExpectedMetrics(t *testing.T, got pdata.Metrics) { + // get the superset of metrics returned by all resource metrics (excluding the first) + returnedMetrics := make(map[string]struct{}) + returnedResourceMetrics := make(map[string]struct{}) rms := got.ResourceMetrics() - require.GreaterOrEqual(t, rms.Len(), 1) - rm := rms.At(0) - assert.True(t, rm.Resource().IsNil() || rm.Resource().Attributes().Len() == 0) + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + metrics := getMetricSlice(t, rm) + returnedMetricNames := getReturnedMetricNames(metrics) - metrics := getMetricSlice(t, rm) - returnedMetrics := getReturnedMetricNames(metrics) + if rm.Resource().IsNil() || rm.Resource().Attributes().Len() == 0 { + appendMapInto(returnedMetrics, returnedMetricNames) + } else { + appendMapInto(returnedResourceMetrics, returnedMetricNames) + } + } - // the expected list of metrics returned is os dependent + // verify the expected list of metrics returned (os dependent) expectedMetrics := append(standardMetrics, systemSpecificMetrics[runtime.GOOS]...) assert.Equal(t, len(expectedMetrics), len(returnedMetrics)) for _, expected := range expectedMetrics { assert.Contains(t, returnedMetrics, expected) } -} -func assertIncludesResourceMetrics(t *testing.T, got pdata.Metrics) { + // verify the expected list of resource metrics returned (Linux & Windows only) if runtime.GOOS != "linux" && runtime.GOOS != "windows" { return } - // get the superset of metrics returned by all resource metrics (excluding the first) - returnedMetrics := make(map[string]struct{}) - rms := got.ResourceMetrics() - for i := 1; i < rms.Len(); i++ { - rm := rms.At(i) - assert.Greater(t, rm.Resource().Attributes().Len(), 0) - metrics := getMetricSlice(t, rm) - appendMapInto(returnedMetrics, getReturnedMetricNames(metrics)) - } - - assert.Equal(t, len(resourceMetrics), len(returnedMetrics)) + assert.Equal(t, len(resourceMetrics), len(returnedResourceMetrics)) for _, expected := range resourceMetrics { - assert.Contains(t, returnedMetrics, expected) + assert.Contains(t, returnedResourceMetrics, expected) } } @@ -207,14 +209,15 @@ type mockFactory struct{ mock.Mock } type mockScraper struct{ mock.Mock } func (m *mockFactory) CreateDefaultConfig() internal.Config { return &mockConfig{} } -func (m *mockFactory) CreateMetricsScraper(context.Context, *zap.Logger, internal.Config) (internal.Scraper, error) { +func (m *mockFactory) CreateMetricsScraper(context.Context, *zap.Logger, internal.Config) (receiverhelper.MetricsScraper, error) { args := m.MethodCalled("CreateMetricsScraper") - return args.Get(0).(internal.Scraper), args.Error(1) + return args.Get(0).(receiverhelper.MetricsScraper), args.Error(1) } +func (m *mockScraper) Name() string { return "" } func (m *mockScraper) Initialize(context.Context) error { return nil } func (m *mockScraper) Close(context.Context) error { return nil } -func (m *mockScraper) ScrapeMetrics(context.Context) (pdata.MetricSlice, error) { +func (m *mockScraper) Scrape(context.Context, string) (pdata.MetricSlice, error) { return pdata.NewMetricSlice(), errors.New("err1") } @@ -222,141 +225,164 @@ type mockResourceFactory struct{ mock.Mock } type mockResourceScraper struct{ mock.Mock } func (m *mockResourceFactory) CreateDefaultConfig() internal.Config { return &mockConfig{} } -func (m *mockResourceFactory) CreateMetricsScraper(context.Context, *zap.Logger, internal.Config) (internal.ResourceScraper, error) { - args := m.MethodCalled("CreateMetricsScraper") - return args.Get(0).(internal.ResourceScraper), args.Error(1) +func (m *mockResourceFactory) CreateResourceMetricsScraper(context.Context, *zap.Logger, internal.Config) (receiverhelper.ResourceMetricsScraper, error) { + args := m.MethodCalled("CreateResourceMetricsScraper") + return args.Get(0).(receiverhelper.ResourceMetricsScraper), args.Error(1) } +func (m *mockResourceScraper) Name() string { return "" } func (m *mockResourceScraper) Initialize(context.Context) error { return nil } func (m *mockResourceScraper) Close(context.Context) error { return nil } -func (m *mockResourceScraper) ScrapeMetrics(context.Context) (pdata.ResourceMetricsSlice, error) { +func (m *mockResourceScraper) Scrape(context.Context, string) (pdata.ResourceMetricsSlice, error) { return pdata.NewResourceMetricsSlice(), errors.New("err2") } func TestGatherMetrics_ScraperKeyConfigError(t *testing.T) { - var mockFactories = map[string]internal.ScraperFactory{} - var mockResourceFactories = map[string]internal.ResourceScraperFactory{} + scraperFactories = map[string]internal.ScraperFactory{} + resourceScraperFactories = map[string]internal.ResourceScraperFactory{} sink := new(consumertest.MetricsSink) config := &Config{Scrapers: map[string]internal.Config{"error": &mockConfig{}}} - - _, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) + _, err := NewFactory().CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, config, sink) require.Error(t, err) } func TestGatherMetrics_CreateMetricsScraperError(t *testing.T) { mFactory := &mockFactory{} mFactory.On("CreateMetricsScraper").Return(&mockScraper{}, errors.New("err1")) - var mockFactories = map[string]internal.ScraperFactory{mockTypeStr: mFactory} - var mockResourceFactories = map[string]internal.ResourceScraperFactory{} + scraperFactories = map[string]internal.ScraperFactory{mockTypeStr: mFactory} + resourceScraperFactories = map[string]internal.ResourceScraperFactory{} sink := new(consumertest.MetricsSink) config := &Config{Scrapers: map[string]internal.Config{mockTypeStr: &mockConfig{}}} - _, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) + _, err := NewFactory().CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, config, sink) require.Error(t, err) } func TestGatherMetrics_CreateMetricsResourceScraperError(t *testing.T) { mResourceFactory := &mockResourceFactory{} - mResourceFactory.On("CreateMetricsScraper").Return(&mockResourceScraper{}, errors.New("err1")) - var mockFactories = map[string]internal.ScraperFactory{} - var mockResourceFactories = map[string]internal.ResourceScraperFactory{mockTypeStr: mResourceFactory} + mResourceFactory.On("CreateResourceMetricsScraper").Return(&mockResourceScraper{}, errors.New("err1")) + scraperFactories = map[string]internal.ScraperFactory{} + resourceScraperFactories = map[string]internal.ResourceScraperFactory{mockResourceTypeStr: mResourceFactory} sink := new(consumertest.MetricsSink) - config := &Config{Scrapers: map[string]internal.Config{mockTypeStr: &mockConfig{}}} - _, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) + config := &Config{Scrapers: map[string]internal.Config{mockResourceTypeStr: &mockConfig{}}} + _, err := NewFactory().CreateMetricsReceiver(context.Background(), component.ReceiverCreateParams{}, config, sink) require.Error(t, err) } -func TestGatherMetrics_Error(t *testing.T) { - mFactory := &mockFactory{} - mFactory.On("CreateMetricsScraper").Return(&mockScraper{}, nil) - mResourceFactory := &mockResourceFactory{} - mResourceFactory.On("CreateMetricsScraper").Return(&mockResourceScraper{}, nil) - - var mockFactories = map[string]internal.ScraperFactory{mockTypeStr: mFactory} - var mockResourceFactories = map[string]internal.ResourceScraperFactory{mockResourceTypeStr: mResourceFactory} - - sink := new(consumertest.MetricsSink) +type notifyingSink struct { + receivedMetrics bool + timesCalled int + ch chan int +} - config := &Config{ - Scrapers: map[string]internal.Config{ - mockTypeStr: &mockConfig{}, - mockResourceTypeStr: &mockConfig{}, - }, +func (s *notifyingSink) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { + if md.MetricCount() > 0 { + s.receivedMetrics = true } - receiver, err := newHostMetricsReceiver(context.Background(), zap.NewNop(), config, mockFactories, mockResourceFactories, sink) - require.NoError(t, err) + s.timesCalled++ + s.ch <- s.timesCalled + return nil +} - receiver.initializeScrapers(context.Background(), componenttest.NewNopHost()) - receiver.scrapeMetrics(context.Background()) +func benchmarkScrapeMetrics(b *testing.B, cfg *Config) { + scraperFactories = factories + resourceScraperFactories = resourceFactories - got := sink.AllMetrics() + sink := ¬ifyingSink{ch: make(chan int, 10)} + tickerCh := make(chan time.Time) - // expect to get one empty resource metrics entry - require.Equal(t, 1, len(got)) - rm := got[0].ResourceMetrics() - require.Equal(t, 1, rm.Len()) - ilm := rm.At(0).InstrumentationLibraryMetrics() - require.Equal(t, 1, ilm.Len()) - metrics := ilm.At(0).Metrics() - require.Equal(t, 0, metrics.Len()) -} + options, err := createAddScraperOptions(context.Background(), zap.NewNop(), cfg, scraperFactories, resourceScraperFactories) + require.NoError(b, err) + options = append(options, receiverhelper.WithTickerChannel(tickerCh)) -func benchmarkScrapeMetrics(b *testing.B, cfg *Config) { - sink := new(consumertest.MetricsSink) + receiver, err := receiverhelper.NewScraperControllerReceiver(&cfg.ScraperControllerSettings, sink, options...) + require.NoError(b, err) - receiver, _ := newHostMetricsReceiver(context.Background(), zap.NewNop(), cfg, factories, resourceFactories, sink) - receiver.initializeScrapers(context.Background(), componenttest.NewNopHost()) + require.NoError(b, receiver.Start(context.Background(), componenttest.NewNopHost())) b.ResetTimer() for n := 0; n < b.N; n++ { - receiver.scrapeMetrics(context.Background()) + tickerCh <- time.Now() + <-sink.ch } - if len(sink.AllMetrics()) == 0 { + if !sink.receivedMetrics { b.Fail() } } func Benchmark_ScrapeCpuMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{cpuscraper.TypeStr: (&cpuscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{cpuscraper.TypeStr: (&cpuscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeDiskMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{diskscraper.TypeStr: (&diskscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{diskscraper.TypeStr: (&diskscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeFileSystemMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{filesystemscraper.TypeStr: (&filesystemscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{filesystemscraper.TypeStr: (&filesystemscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeLoadMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{loadscraper.TypeStr: (&loadscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{loadscraper.TypeStr: (&loadscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeMemoryMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{memoryscraper.TypeStr: (&memoryscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{memoryscraper.TypeStr: (&memoryscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeNetworkMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{networkscraper.TypeStr: (&networkscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{networkscraper.TypeStr: (&networkscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeProcessesMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{processesscraper.TypeStr: (&processesscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{processesscraper.TypeStr: (&processesscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeSwapMetrics(b *testing.B) { - cfg := &Config{Scrapers: map[string]internal.Config{swapscraper.TypeStr: (&swapscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{swapscraper.TypeStr: (&swapscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } @@ -365,12 +391,17 @@ func Benchmark_ScrapeProcessMetrics(b *testing.B) { b.Skip("skipping test on non linux/windows") } - cfg := &Config{Scrapers: map[string]internal.Config{processscraper.TypeStr: (&processscraper.Factory{}).CreateDefaultConfig()}} + cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), + Scrapers: map[string]internal.Config{processscraper.TypeStr: (&processscraper.Factory{}).CreateDefaultConfig()}, + } + benchmarkScrapeMetrics(b, cfg) } func Benchmark_ScrapeSystemMetrics(b *testing.B) { cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), Scrapers: map[string]internal.Config{ cpuscraper.TypeStr: (&cpuscraper.Factory{}).CreateDefaultConfig(), diskscraper.TypeStr: (&diskscraper.Factory{}).CreateDefaultConfig(), @@ -388,6 +419,7 @@ func Benchmark_ScrapeSystemMetrics(b *testing.B) { func Benchmark_ScrapeSystemAndProcessMetrics(b *testing.B) { cfg := &Config{ + ScraperControllerSettings: receiverhelper.DefaultScraperControllerSettings(""), Scrapers: map[string]internal.Config{ cpuscraper.TypeStr: &cpuscraper.Config{}, diskscraper.TypeStr: &diskscraper.Config{}, diff --git a/receiver/hostmetricsreceiver/internal/scraper.go b/receiver/hostmetricsreceiver/internal/scraper.go index c7cd62b9936..9e2b2eefaf3 100644 --- a/receiver/hostmetricsreceiver/internal/scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper.go @@ -19,63 +19,31 @@ import ( "go.uber.org/zap" - "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) -// BaseScraper gathers metrics from the host machine. -type BaseScraper interface { - // Initialize performs any timely initialization tasks such as - // setting up performance counters for initial collection. - Initialize(ctx context.Context) error - // Close should clean up any unmanaged resources such as - // performance counter handles. - Close(ctx context.Context) error -} - // BaseFactory for creating Scrapers. type BaseFactory interface { // CreateDefaultConfig creates the default configuration for the Scraper. CreateDefaultConfig() Config } -// Scraper gathers metrics from the host machine. -type Scraper interface { - BaseScraper - - // ScrapeMetrics returns relevant scraped metrics. If errors occur - // scraping some metrics, an error should be returned, but any - // metrics that were successfully scraped should still be returned. - ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error) -} - // ScraperFactory can create a MetricScraper. type ScraperFactory interface { BaseFactory // CreateMetricsScraper creates a scraper based on this config. // If the config is not valid, error will be returned instead. - CreateMetricsScraper(ctx context.Context, logger *zap.Logger, cfg Config) (Scraper, error) -} - -// ResourceScraper gathers metrics from a low-level resource such as -// a process. -type ResourceScraper interface { - BaseScraper - - // ScrapeMetrics returns relevant scraped metrics per resource. - // If errors occur scraping some metrics, an error should be - // returned, but any metrics that were successfully scraped - // should still be returned. - ScrapeMetrics(ctx context.Context) (pdata.ResourceMetricsSlice, error) + CreateMetricsScraper(ctx context.Context, logger *zap.Logger, cfg Config) (receiverhelper.MetricsScraper, error) } // ResourceScraperFactory can create a ResourceScraper. type ResourceScraperFactory interface { BaseFactory - // CreateMetricsScraper creates a resource scraper based on this + // CreateResourceMetricsScraper creates a resource scraper based on this // config. If the config is not valid, error will be returned instead. - CreateMetricsScraper(ctx context.Context, logger *zap.Logger, cfg Config) (ResourceScraper, error) + CreateResourceMetricsScraper(ctx context.Context, logger *zap.Logger, cfg Config) (receiverhelper.ResourceMetricsScraper, error) } // Config is the configuration of a scraper. diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go index 17427382845..70399757626 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper.go @@ -21,11 +21,14 @@ import ( "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/host" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/metadata" ) +const metricsLen = 1 + // scraper for CPU Metrics type scraper struct { config *Config @@ -52,22 +55,17 @@ func (s *scraper) Initialize(_ context.Context) error { return nil } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() now := internal.TimeToUnixNano(time.Now()) cpuTimes, err := s.times( /*percpu=*/ true) if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } - metrics.Resize(1) + metrics.Resize(metricsLen) initializeCPUTimeMetric(metrics.At(0), s.startTime, now, cpuTimes) return metrics, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go index c863381b1b0..b82d92d7ff7 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/cpu_scraper_test.go @@ -24,12 +24,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/metadata" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string bootTimeFunc func() (uint64, error) @@ -76,11 +77,17 @@ func TestScrapeMetrics(t *testing.T) { return } require.NoError(t, err, "Failed to initialize cpu scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if test.expectedErr != "" { assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, 1, err.(consumererror.PartialScrapeError).Failed) + } + return } require.NoError(t, err, "Failed to scrape metrics: %v", err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go index eb22361873a..63541f835e9 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/cpuscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for CPU scraper. @@ -44,7 +44,15 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { +) (receiverhelper.MetricsScraper, error) { cfg := config.(*Config) - return obsreportscraper.WrapScraper(newCPUScraper(ctx, cfg), TypeStr), nil + s := newCPUScraper(ctx, cfg) + + ms := receiverhelper.NewMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others.go index 3d6d4ea87e6..d9d9280d2c5 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others.go @@ -24,11 +24,17 @@ import ( "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/host" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) +const ( + standardMetricsLen = 5 + metricsLen = standardMetricsLen + systemSpecificMetricsLen +) + // scraper for Disk Metrics type scraper struct { config *Config @@ -75,26 +81,21 @@ func (s *scraper) Initialize(_ context.Context) error { return nil } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() now := internal.TimeToUnixNano(time.Now()) ioCounters, err := s.ioCounters() if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } // filter devices by name ioCounters = s.filterByDevice(ioCounters) if len(ioCounters) > 0 { - metrics.Resize(5 + systemSpecificMetricsLen) + metrics.Resize(metricsLen) initializeDiskIOMetric(metrics.At(0), s.startTime, now, ioCounters) initializeDiskOpsMetric(metrics.At(1), s.startTime, now, ioCounters) initializeDiskIOTimeMetric(metrics.At(2), s.startTime, now, ioCounters) diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go index 91bdfdbc7e3..ee59941af7b 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_others_test.go @@ -24,9 +24,11 @@ import ( "github.com/shirou/gopsutil/disk" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/consumererror" ) -func TestScrapeMetrics_Others(t *testing.T) { +func TestScrape_Others(t *testing.T) { type testCase struct { name string ioCountersFunc func(names ...string) (map[string]disk.IOCountersStat, error) @@ -52,10 +54,15 @@ func TestScrapeMetrics_Others(t *testing.T) { err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize disk scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - _, err = scraper.ScrapeMetrics(context.Background()) + _, err = scraper.Scrape(context.Background()) assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, metricsLen, err.(consumererror.PartialScrapeError).Failed) + } }) } } diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go index 1cd41e7aeba..0da21e11ac7 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_test.go @@ -28,7 +28,7 @@ import ( "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string config Config @@ -92,9 +92,8 @@ func TestScrapeMetrics(t *testing.T) { return } require.NoError(t, err, "Failed to initialize disk scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) require.NoError(t, err, "Failed to scrape metrics: %v", err) if !test.expectMetrics { diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go index bf3e61e28df..08880b0ea4d 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows.go @@ -21,6 +21,7 @@ import ( "github.com/shirou/gopsutil/host" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" @@ -28,6 +29,8 @@ import ( ) const ( + metricsLen = 5 + logicalDisk = "LogicalDisk" readsPerSec = "Disk Reads/sec" @@ -92,25 +95,20 @@ func (s *scraper) Initialize(_ context.Context) error { return s.perfCounterScraper.Initialize(logicalDisk) } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(ctx context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() now := internal.TimeToUnixNano(time.Now()) counters, err := s.perfCounterScraper.Scrape() if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } logicalDiskObject, err := counters.GetObject(logicalDisk) if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } // filter devices by name @@ -118,11 +116,11 @@ func (s *scraper) ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error) logicalDiskCounterValues, err := logicalDiskObject.GetValues(readsPerSec, writesPerSec, readBytesPerSec, writeBytesPerSec, idleTime, avgDiskSecsPerRead, avgDiskSecsPerWrite, queueLength) if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } if len(logicalDiskCounterValues) > 0 { - metrics.Resize(5) + metrics.Resize(metricsLen) initializeDiskIOMetric(metrics.At(0), s.startTime, now, logicalDiskCounterValues) initializeDiskOpsMetric(metrics.At(1), s.startTime, now, logicalDiskCounterValues) initializeDiskIOTimeMetric(metrics.At(2), s.startTime, now, logicalDiskCounterValues) diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go index 3e41e3147f3..2aa47ffc1dc 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/disk_scraper_windows_test.go @@ -24,10 +24,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" ) -func TestScrapeMetrics_Error(t *testing.T) { +func TestScrape_Error(t *testing.T) { type testCase struct { name string scrapeErr error @@ -63,10 +64,15 @@ func TestScrapeMetrics_Error(t *testing.T) { err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize disk scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - _, err = scraper.ScrapeMetrics(context.Background()) + _, err = scraper.Scrape(context.Background()) assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, metricsLen, err.(consumererror.PartialScrapeError).Failed) + } }) } } diff --git a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/factory.go index 91738608ecf..3481897e613 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/diskscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/diskscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Disk scraper. @@ -44,11 +44,18 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { - scraper, err := newDiskScraper(ctx, config.(*Config)) +) (receiverhelper.MetricsScraper, error) { + cfg := config.(*Config) + s, err := newDiskScraper(ctx, cfg) if err != nil { return nil, err } - return obsreportscraper.WrapScraper(scraper, TypeStr), nil + ms := receiverhelper.NewMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/factory.go index 2802fa9571f..78dcd8ce613 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for FileSystem scraper. @@ -49,11 +49,14 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { - scraper, err := newFileSystemScraper(ctx, config.(*Config)) +) (receiverhelper.MetricsScraper, error) { + cfg := config.(*Config) + s, err := newFileSystemScraper(ctx, cfg) if err != nil { return nil, err } - return obsreportscraper.WrapScraper(scraper, TypeStr), nil + ms := receiverhelper.NewMetricsScraper(TypeStr, s.Scrape) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper.go index acd6ef5cdd3..6845cd0f38b 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper.go @@ -21,9 +21,15 @@ import ( "github.com/shirou/gopsutil/disk" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" + "go.opentelemetry.io/collector/receiver/receiverhelper" +) + +const ( + standardMetricsLen = 1 + metricsLen = standardMetricsLen + systemSpecificMetricsLen ) // scraper for FileSystem Metrics @@ -52,18 +58,8 @@ func newFileSystemScraper(_ context.Context, cfg *Config) (*scraper, error) { return scraper, nil } -// Initialize -func (s *scraper) Initialize(_ context.Context) error { - return nil -} - -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() now := internal.TimeToUnixNano(time.Now()) @@ -71,7 +67,7 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { // omit logical (virtual) filesystems (not relevant for windows) partitions, err := s.partitions( /*all=*/ false) if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } var errors []error @@ -80,9 +76,9 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { if !s.fsFilter.includePartition(partition) { continue } - usage, err := s.usage(partition.Mountpoint) - if err != nil { - errors = append(errors, err) + usage, usageErr := s.usage(partition.Mountpoint) + if usageErr != nil { + errors = append(errors, consumererror.NewPartialScrapeError(usageErr, 0)) continue } @@ -90,13 +86,19 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { } if len(usages) > 0 { - metrics.Resize(1 + systemSpecificMetricsLen) - + metrics.Resize(metricsLen) initializeFileSystemUsageMetric(metrics.At(0), now, usages) appendSystemSpecificMetrics(metrics, 1, now, usages) } - return metrics, componenterror.CombineErrors(errors) + err = receiverhelper.CombineScrapeErrors(errors) + if err != nil && len(usages) == 0 { + partialErr := err.(consumererror.PartialScrapeError) + partialErr.Failed = metricsLen + err = partialErr + } + + return metrics, err } func initializeFileSystemUsageMetric(metric pdata.Metric, now pdata.TimestampUnixNano, deviceUsages []*deviceUsage) { diff --git a/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper_test.go index 1a6133d2587..12d2aee3365 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/filesystemscraper/filesystem_scraper_test.go @@ -24,12 +24,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string config Config @@ -191,13 +192,16 @@ func TestScrapeMetrics(t *testing.T) { scraper.usage = test.usageFunc } - err = scraper.Initialize(context.Background()) - require.NoError(t, err, "Failed to initialize file system scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if test.expectedErr != "" { assert.Contains(t, err.Error(), test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, metricsLen, err.(consumererror.PartialScrapeError).Failed) + } + return } require.NoError(t, err, "Failed to scrape metrics: %v", err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/factory.go index 74e54d38ec0..d8d23d73f58 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Load scraper. @@ -44,7 +44,16 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, logger *zap.Logger, config internal.Config, -) (internal.Scraper, error) { +) (receiverhelper.MetricsScraper, error) { cfg := config.(*Config) - return obsreportscraper.WrapScraper(newLoadScraper(ctx, logger, cfg), TypeStr), nil + s := newLoadScraper(ctx, logger, cfg) + + ms := receiverhelper.NewMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + receiverhelper.WithClose(s.Close), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper.go index 4fbff8c87f5..175486ffb0a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper.go @@ -21,10 +21,13 @@ import ( "github.com/shirou/gopsutil/load" "go.uber.org/zap" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) +const metricsLen = 3 + // scraper for Load Metrics type scraper struct { logger *zap.Logger @@ -49,17 +52,17 @@ func (s *scraper) Close(ctx context.Context) error { return stopSampling(ctx) } -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() now := internal.TimeToUnixNano(time.Now()) avgLoadValues, err := s.load() if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } - metrics.Resize(3) + metrics.Resize(metricsLen) initializeLoadMetric(metrics.At(0), loadAvg1MDescriptor, now, avgLoadValues.Load1) initializeLoadMetric(metrics.At(1), loadAvg5mDescriptor, now, avgLoadValues.Load5) initializeLoadMetric(metrics.At(2), loadAvg15mDescriptor, now, avgLoadValues.Load15) diff --git a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go index 05f9c1eea6c..c2f71644105 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/loadscraper/load_scraper_test.go @@ -24,11 +24,12 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string loadFunc func() (*load.AvgStat, error) @@ -57,9 +58,16 @@ func TestScrapeMetrics(t *testing.T) { require.NoError(t, err, "Failed to initialize load scraper: %v", err) defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if test.expectedErr != "" { assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, metricsLen, err.(consumererror.PartialScrapeError).Failed) + } + return } require.NoError(t, err, "Failed to scrape metrics: %v", err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/factory.go index 0321b193b07..289704bfeb4 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Memory scraper. @@ -44,7 +44,11 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { +) (receiverhelper.MetricsScraper, error) { cfg := config.(*Config) - return obsreportscraper.WrapScraper(newMemoryScraper(ctx, cfg), TypeStr), nil + s := newMemoryScraper(ctx, cfg) + + ms := receiverhelper.NewMetricsScraper(TypeStr, s.Scrape) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper.go index 09e84494496..16b49dc725c 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper.go @@ -20,11 +20,14 @@ import ( "github.com/shirou/gopsutil/mem" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/metadata" ) +const metricsLen = 1 + // scraper for Memory Metrics type scraper struct { config *Config @@ -38,27 +41,17 @@ func newMemoryScraper(_ context.Context, cfg *Config) *scraper { return &scraper{config: cfg, virtualMemory: mem.VirtualMemory} } -// Initialize -func (s *scraper) Initialize(_ context.Context) error { - return nil -} - -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() now := internal.TimeToUnixNano(time.Now()) memInfo, err := s.virtualMemory() if err != nil { - return metrics, err + return metrics, consumererror.NewPartialScrapeError(err, metricsLen) } - metrics.Resize(1) + metrics.Resize(metricsLen) initializeMemoryUsageMetric(metrics.At(0), now, memInfo) return metrics, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper_test.go index 564f905cb6d..8b2b1a6a0c7 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/memoryscraper/memory_scraper_test.go @@ -24,12 +24,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/metadata" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string virtualMemoryFunc func() (*mem.VirtualMemoryStat, error) @@ -54,13 +55,16 @@ func TestScrapeMetrics(t *testing.T) { scraper.virtualMemory = test.virtualMemoryFunc } - err := scraper.Initialize(context.Background()) - require.NoError(t, err, "Failed to initialize memory scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if test.expectedErr != "" { assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, metricsLen, err.(consumererror.PartialScrapeError).Failed) + } + return } require.NoError(t, err, "Failed to scrape metrics: %v", err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/networkscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/networkscraper/factory.go index c94e407f93c..cad87a4b715 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/networkscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/networkscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Network scraper. @@ -44,11 +44,18 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { - scraper, err := newNetworkScraper(ctx, config.(*Config)) +) (receiverhelper.MetricsScraper, error) { + cfg := config.(*Config) + s, err := newNetworkScraper(ctx, cfg) if err != nil { return nil, err } - return obsreportscraper.WrapScraper(scraper, TypeStr), nil + ms := receiverhelper.NewMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper.go index 900016e4fb6..3c06e8417fa 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper.go @@ -22,10 +22,16 @@ import ( "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/net" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" + "go.opentelemetry.io/collector/receiver/receiverhelper" +) + +const ( + networkMetricsLen = 4 + connectionsMetricsLen = 1 ) // scraper for Network Metrics @@ -75,13 +81,8 @@ func (s *scraper) Initialize(_ context.Context) error { return nil } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() var errors []error @@ -96,7 +97,7 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { errors = append(errors, err) } - return metrics, componenterror.CombineErrors(errors) + return metrics, receiverhelper.CombineScrapeErrors(errors) } func (s *scraper) scrapeAndAppendNetworkCounterMetrics(metrics pdata.MetricSlice, startTime pdata.TimestampUnixNano) error { @@ -105,7 +106,7 @@ func (s *scraper) scrapeAndAppendNetworkCounterMetrics(metrics pdata.MetricSlice // get total stats only ioCounters, err := s.ioCounters( /*perNetworkInterfaceController=*/ true) if err != nil { - return err + return consumererror.NewPartialScrapeError(err, networkMetricsLen) } // filter network interfaces by name @@ -113,7 +114,7 @@ func (s *scraper) scrapeAndAppendNetworkCounterMetrics(metrics pdata.MetricSlice if len(ioCounters) > 0 { startIdx := metrics.Len() - metrics.Resize(startIdx + 4) + metrics.Resize(startIdx + networkMetricsLen) initializeNetworkPacketsMetric(metrics.At(startIdx+0), networkPacketsDescriptor, startTime, now, ioCounters) initializeNetworkDroppedPacketsMetric(metrics.At(startIdx+1), networkDroppedPacketsDescriptor, startTime, now, ioCounters) initializeNetworkErrorsMetric(metrics.At(startIdx+2), networkErrorsDescriptor, startTime, now, ioCounters) @@ -181,13 +182,13 @@ func (s *scraper) scrapeAndAppendNetworkTCPConnectionsMetric(metrics pdata.Metri connections, err := s.connections("tcp") if err != nil { - return err + return consumererror.NewPartialScrapeError(err, connectionsMetricsLen) } connectionStatusCounts := getTCPConnectionStatusCounts(connections) startIdx := metrics.Len() - metrics.Resize(startIdx + 1) + metrics.Resize(startIdx + connectionsMetricsLen) initializeNetworkTCPConnectionsMetric(metrics.At(startIdx), now, connectionStatusCounts) return nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper_test.go index b2a54d2bb7f..d65ea7dedfd 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/networkscraper/network_scraper_test.go @@ -23,12 +23,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string config Config @@ -40,6 +41,7 @@ func TestScrapeMetrics(t *testing.T) { newErrRegex string initializationErr string expectedErr string + expectedErrCount int } testCases := []testCase{ @@ -74,14 +76,16 @@ func TestScrapeMetrics(t *testing.T) { initializationErr: "err1", }, { - name: "IOCounters Error", - ioCountersFunc: func(bool) ([]net.IOCountersStat, error) { return nil, errors.New("err2") }, - expectedErr: "err2", + name: "IOCounters Error", + ioCountersFunc: func(bool) ([]net.IOCountersStat, error) { return nil, errors.New("err2") }, + expectedErr: "err2", + expectedErrCount: networkMetricsLen, }, { - name: "Connections Error", - connectionsFunc: func(string) ([]net.ConnectionStat, error) { return nil, errors.New("err3") }, - expectedErr: "err3", + name: "Connections Error", + connectionsFunc: func(string) ([]net.ConnectionStat, error) { return nil, errors.New("err3") }, + expectedErr: "err3", + expectedErrCount: connectionsMetricsLen, }, } @@ -111,11 +115,17 @@ func TestScrapeMetrics(t *testing.T) { return } require.NoError(t, err, "Failed to initialize network scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if test.expectedErr != "" { assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, test.expectedErrCount, err.(consumererror.PartialScrapeError).Failed) + } + return } require.NoError(t, err, "Failed to scrape metrics: %v", err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common.go b/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common.go deleted file mode 100644 index 31f2a12214f..00000000000 --- a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package obsreportscraper - -func scrapeMetricsSpanName(typeStr string) string { - return typeStr + "scraper.ScrapeMetrics" -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common_test.go b/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common_test.go deleted file mode 100644 index 63b884788a4..00000000000 --- a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/common_test.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package obsreportscraper - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSpanName(t *testing.T) { - scraperTypes := map[string]string{ - "cpu": "cpuscraper.ScrapeMetrics", - "disk": "diskscraper.ScrapeMetrics", - "load": "loadscraper.ScrapeMetrics", - "filesystem": "filesystemscraper.ScrapeMetrics", - "memory": "memoryscraper.ScrapeMetrics", - "network": "networkscraper.ScrapeMetrics", - "processes": "processesscraper.ScrapeMetrics", - "swap": "swapscraper.ScrapeMetrics", - "process": "processscraper.ScrapeMetrics", - } - - for typeStr, spanName := range scraperTypes { - assert.Equal(t, spanName, scrapeMetricsSpanName(typeStr)) - } -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper.go b/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper.go deleted file mode 100644 index ad0741e0928..00000000000 --- a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package obsreportscraper - -import ( - "context" - - "go.opencensus.io/trace" - - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" -) - -type resourceScraper struct { - delegate internal.ResourceScraper - scrapeMetricsSpanName string -} - -// WrapResourceScraper wraps an internal.ResourceScraper and provides observability support. -func WrapResourceScraper(delegate internal.ResourceScraper, typeStr string) internal.ResourceScraper { - return &resourceScraper{delegate: delegate, scrapeMetricsSpanName: scrapeMetricsSpanName(typeStr)} -} - -func (s *resourceScraper) Initialize(ctx context.Context) error { - return s.delegate.Initialize(ctx) -} - -func (s *resourceScraper) Close(ctx context.Context) error { - return s.delegate.Close(ctx) -} - -// ScrapeMetrics -func (s *resourceScraper) ScrapeMetrics(ctx context.Context) (pdata.ResourceMetricsSlice, error) { - // TODO: Add metrics. - ctx, span := trace.StartSpan(ctx, s.scrapeMetricsSpanName) - defer span.End() - - rms, err := s.delegate.ScrapeMetrics(ctx) - - if err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - } - return rms, err -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper_test.go deleted file mode 100644 index 51b703b4e5d..00000000000 --- a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportresourcescraper_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package obsreportscraper - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "go.opencensus.io/trace" - - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/data/testdata" -) - -func TestWrapResourceScraper(t *testing.T) { - ts := &testResourceScraper{ - t: t, - err: nil, - } - obss := WrapResourceScraper(ts, "test") - assert.NoError(t, obss.Initialize(context.Background())) - rms, err := obss.ScrapeMetrics(context.Background()) - assert.NoError(t, err) - assert.EqualValues(t, generateResourceMetricsSlice(), rms) - assert.NoError(t, obss.Close(context.Background())) -} - -func TestWrapResourceScraper_Error(t *testing.T) { - ts := &testResourceScraper{ - t: t, - err: fmt.Errorf("my error"), - } - obss := WrapResourceScraper(ts, "test") - assert.Error(t, obss.Initialize(context.Background())) - rms, err := obss.ScrapeMetrics(context.Background()) - assert.Error(t, err) - assert.EqualValues(t, generateResourceMetricsSlice(), rms) - assert.Error(t, obss.Close(context.Background())) -} - -type testResourceScraper struct { - t *testing.T - err error -} - -func (s *testResourceScraper) Initialize(_ context.Context) error { - return s.err -} - -func (s *testResourceScraper) Close(_ context.Context) error { - return s.err -} - -// ScrapeMetrics -func (s *testResourceScraper) ScrapeMetrics(ctx context.Context) (pdata.ResourceMetricsSlice, error) { - assert.NotNil(s.t, trace.FromContext(ctx)) - return generateResourceMetricsSlice(), s.err -} - -func generateResourceMetricsSlice() pdata.ResourceMetricsSlice { - return testdata.GenerateMetricsOneMetric().ResourceMetrics() -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper.go b/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper.go deleted file mode 100644 index 520b692f1c1..00000000000 --- a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package obsreportscraper - -import ( - "context" - - "go.opencensus.io/trace" - - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" -) - -type scraper struct { - delegate internal.Scraper - scrapeMetricsSpanName string -} - -// WrapScraper wraps an internal.Scraper and provides observability support. -func WrapScraper(delegate internal.Scraper, typeStr string) internal.Scraper { - return &scraper{delegate: delegate, scrapeMetricsSpanName: scrapeMetricsSpanName(typeStr)} -} - -func (s *scraper) Initialize(ctx context.Context) error { - return s.delegate.Initialize(ctx) -} - -func (s *scraper) Close(ctx context.Context) error { - return s.delegate.Close(ctx) -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error) { - // TODO: Add metrics. - ctx, span := trace.StartSpan(ctx, s.scrapeMetricsSpanName) - defer span.End() - - ms, err := s.delegate.ScrapeMetrics(ctx) - - if err != nil { - span.SetStatus(trace.Status{Code: trace.StatusCodeUnknown, Message: err.Error()}) - } - return ms, err -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper_test.go deleted file mode 100644 index 7ae5b54fb4c..00000000000 --- a/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper/obsreportscraper_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package obsreportscraper - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "go.opencensus.io/trace" - - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/data/testdata" -) - -func TestWrapScraper(t *testing.T) { - ts := &testScraper{ - t: t, - err: nil, - } - obss := WrapScraper(ts, "test") - assert.NoError(t, obss.Initialize(context.Background())) - ms, err := obss.ScrapeMetrics(context.Background()) - assert.NoError(t, err) - assert.EqualValues(t, generateMetricsSlice(), ms) - assert.NoError(t, obss.Close(context.Background())) -} - -func TestWrapScraper_Error(t *testing.T) { - ts := &testScraper{ - t: t, - err: fmt.Errorf("my error"), - } - obss := WrapScraper(ts, "test") - assert.Error(t, obss.Initialize(context.Background())) - ms, err := obss.ScrapeMetrics(context.Background()) - assert.Error(t, err) - assert.EqualValues(t, generateMetricsSlice(), ms) - assert.Error(t, obss.Close(context.Background())) -} - -type testScraper struct { - t *testing.T - err error -} - -func (s *testScraper) Initialize(_ context.Context) error { - return s.err -} - -func (s *testScraper) Close(_ context.Context) error { - return s.err -} - -// ScrapeMetrics -func (s *testScraper) ScrapeMetrics(ctx context.Context) (pdata.MetricSlice, error) { - assert.NotNil(s.t, trace.FromContext(ctx)) - return generateMetricsSlice(), s.err -} - -func generateMetricsSlice() pdata.MetricSlice { - return testdata.GenerateMetricsOneMetric().ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics() -} diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go index 964a92dadf6..9bec552012a 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Processes scraper. @@ -44,7 +44,15 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { +) (receiverhelper.MetricsScraper, error) { cfg := config.(*Config) - return obsreportscraper.WrapScraper(newProcessesScraper(ctx, cfg), TypeStr), nil + s := newProcessesScraper(ctx, cfg) + + ms := receiverhelper.NewMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go index 8a8424783fc..6a54aba4fee 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper.go @@ -23,6 +23,8 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) +const metricsLen = systemSpecificMetricsLen + // scraper for Processes Metrics type scraper struct { config *Config @@ -50,13 +52,8 @@ func (s *scraper) Initialize(_ context.Context) error { return nil } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() err := appendSystemSpecificProcessesMetrics(metrics, 0, s.misc) return metrics, err diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go index 265aa89fa86..33f07cdc12b 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_fallback.go @@ -18,6 +18,8 @@ package processesscraper import "go.opentelemetry.io/collector/consumer/pdata" +const systemSpecificMetricsLen = 2 + func appendSystemSpecificProcessesMetrics(metrics pdata.MetricSlice, startIndex int, miscFunc getMiscStats) error { return nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go index 4a30e6e96f8..d3fa0cd9736 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) @@ -35,7 +36,7 @@ var systemSpecificMetrics = map[string][]pdata.Metric{ "openbsd": {processesRunningDescriptor, processesBlockedDescriptor}, } -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string miscFunc func() (*load.MiscStat, error) @@ -64,11 +65,17 @@ func TestScrapeMetrics(t *testing.T) { err := scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize processes scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if len(expectedMetrics) > 0 && test.expectedErr != "" { assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, metricsLen, err.(consumererror.PartialScrapeError).Failed) + } + return } require.NoError(t, err, "Failed to scrape metrics: %v", err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go index 4f1013b93c2..82794399630 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processesscraper/processes_scraper_unix.go @@ -19,18 +19,21 @@ package processesscraper import ( "time" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) +const systemSpecificMetricsLen = 2 + func appendSystemSpecificProcessesMetrics(metrics pdata.MetricSlice, startIndex int, miscFunc getMiscStats) error { now := internal.TimeToUnixNano(time.Now()) misc, err := miscFunc() if err != nil { - return err + return consumererror.NewPartialScrapeError(err, systemSpecificMetricsLen) } - metrics.Resize(startIndex + 2) + metrics.Resize(startIndex + systemSpecificMetricsLen) initializeProcessesMetric(metrics.At(startIndex+0), processesRunningDescriptor, now, int64(misc.ProcsRunning)) initializeProcessesMetric(metrics.At(startIndex+1), processesBlockedDescriptor, now, int64(misc.ProcsBlocked)) return nil diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go index fc6981390b7..fe26bb88052 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory.go @@ -22,7 +22,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Process scraper. @@ -41,20 +41,27 @@ func (f *Factory) CreateDefaultConfig() internal.Config { return &Config{} } -// CreateMetricsScraper creates a resource scraper based on provided config. -func (f *Factory) CreateMetricsScraper( +// CreateResourceMetricsScraper creates a resource scraper based on provided config. +func (f *Factory) CreateResourceMetricsScraper( _ context.Context, _ *zap.Logger, config internal.Config, -) (internal.ResourceScraper, error) { +) (receiverhelper.ResourceMetricsScraper, error) { if runtime.GOOS != "linux" && runtime.GOOS != "windows" { return nil, errors.New("process scraper only available on Linux or Windows") } cfg := config.(*Config) - ps, err := newProcessScraper(cfg) + s, err := newProcessScraper(cfg) if err != nil { return nil, err } - return obsreportscraper.WrapResourceScraper(ps, TypeStr), nil + + ms := receiverhelper.NewResourceMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory_test.go index 217202f6130..d16165e2379 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/factory_test.go @@ -29,11 +29,11 @@ func TestCreateDefaultConfig(t *testing.T) { assert.IsType(t, &Config{}, cfg) } -func TestCreateMetricsScraper(t *testing.T) { +func TestCreateResourceMetricsScraper(t *testing.T) { factory := &Factory{} cfg := &Config{} - scraper, err := factory.CreateMetricsScraper(context.Background(), zap.NewNop(), cfg) + scraper, err := factory.CreateResourceMetricsScraper(context.Background(), zap.NewNop(), cfg) if runtime.GOOS == "linux" || runtime.GOOS == "windows" { assert.NoError(t, err) diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go index dda292e760b..01441795ac6 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper.go @@ -23,10 +23,19 @@ import ( "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/process" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" + "go.opentelemetry.io/collector/receiver/receiverhelper" +) + +const ( + cpuMetricsLen = 1 + memoryMetricsLen = 2 + diskMetricsLen = 1 + + metricsLen = cpuMetricsLen + memoryMetricsLen + diskMetricsLen ) // scraper for Process Metrics @@ -75,21 +84,21 @@ func (s *scraper) Initialize(_ context.Context) error { return nil } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.ResourceMetricsSlice, error) { + rms := pdata.NewResourceMetricsSlice() -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.ResourceMetricsSlice, error) { var errs []error metadata, err := s.getProcessMetadata() if err != nil { + if !consumererror.IsPartialScrapeError(err) { + return rms, err + } + errs = append(errs, err) } - rms := pdata.NewResourceMetricsSlice() rms.Resize(len(metadata)) for i, md := range metadata { rm := rms.At(i) @@ -102,19 +111,19 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.ResourceMetricsSlice, now := internal.TimeToUnixNano(time.Now()) if err = scrapeAndAppendCPUTimeMetric(metrics, s.startTime, now, md.handle); err != nil { - errs = append(errs, fmt.Errorf("error reading cpu times for process %q (pid %v): %w", md.executable.name, md.pid, err)) + errs = append(errs, consumererror.NewPartialScrapeError(fmt.Errorf("error reading cpu times for process %q (pid %v): %w", md.executable.name, md.pid, err), cpuMetricsLen)) } if err = scrapeAndAppendMemoryUsageMetrics(metrics, now, md.handle); err != nil { - errs = append(errs, fmt.Errorf("error reading memory info for process %q (pid %v): %w", md.executable.name, md.pid, err)) + errs = append(errs, consumererror.NewPartialScrapeError(fmt.Errorf("error reading memory info for process %q (pid %v): %w", md.executable.name, md.pid, err), memoryMetricsLen)) } if err = scrapeAndAppendDiskIOMetric(metrics, s.startTime, now, md.handle); err != nil { - errs = append(errs, fmt.Errorf("error reading disk usage for process %q (pid %v): %w", md.executable.name, md.pid, err)) + errs = append(errs, consumererror.NewPartialScrapeError(fmt.Errorf("error reading disk usage for process %q (pid %v): %w", md.executable.name, md.pid, err), diskMetricsLen)) } } - return rms, componenterror.CombineErrors(errs) + return rms, receiverhelper.CombineScrapeErrors(errs) } // getProcessMetadata returns a slice of processMetadata, including handles, @@ -135,7 +144,7 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) { executable, err := getProcessExecutable(handle) if err != nil { - errs = append(errs, fmt.Errorf("error reading process name for pid %v: %w", pid, err)) + errs = append(errs, consumererror.NewPartialScrapeError(fmt.Errorf("error reading process name for pid %v: %w", pid, err), 1)) continue } @@ -147,12 +156,12 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) { command, err := getProcessCommand(handle) if err != nil { - errs = append(errs, fmt.Errorf("error reading command for process %q (pid %v): %w", executable.name, pid, err)) + errs = append(errs, consumererror.NewPartialScrapeError(fmt.Errorf("error reading command for process %q (pid %v): %w", executable.name, pid, err), 0)) } username, err := handle.Username() if err != nil { - errs = append(errs, fmt.Errorf("error reading username for process %q (pid %v): %w", executable.name, pid, err)) + errs = append(errs, consumererror.NewPartialScrapeError(fmt.Errorf("error reading username for process %q (pid %v): %w", executable.name, pid, err), 0)) } md := &processMetadata{ @@ -166,7 +175,7 @@ func (s *scraper) getProcessMetadata() ([]*processMetadata, error) { metadata = append(metadata, md) } - return metadata, componenterror.CombineErrors(errs) + return metadata, receiverhelper.CombineScrapeErrors(errs) } func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime, now pdata.TimestampUnixNano, handle processHandle) error { @@ -176,7 +185,7 @@ func scrapeAndAppendCPUTimeMetric(metrics pdata.MetricSlice, startTime, now pdat } startIdx := metrics.Len() - metrics.Resize(startIdx + 1) + metrics.Resize(startIdx + cpuMetricsLen) initializeCPUTimeMetric(metrics.At(startIdx), startTime, now, times) return nil } @@ -196,7 +205,7 @@ func scrapeAndAppendMemoryUsageMetrics(metrics pdata.MetricSlice, now pdata.Time } startIdx := metrics.Len() - metrics.Resize(startIdx + 2) + metrics.Resize(startIdx + memoryMetricsLen) initializeMemoryUsageMetric(metrics.At(startIdx+0), physicalMemoryUsageDescriptor, now, int64(mem.RSS)) initializeMemoryUsageMetric(metrics.At(startIdx+1), virtualMemoryUsageDescriptor, now, int64(mem.VMS)) return nil @@ -222,7 +231,7 @@ func scrapeAndAppendDiskIOMetric(metrics pdata.MetricSlice, startTime, now pdata } startIdx := metrics.Len() - metrics.Resize(startIdx + 1) + metrics.Resize(startIdx + diskMetricsLen) initializeDiskIOMetric(metrics.At(startIdx), startTime, now, io) return nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go index 8806d15fa87..1d142d5da8b 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/processscraper/process_scraper_test.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "runtime" - "strings" "testing" "github.com/shirou/gopsutil/cpu" @@ -28,6 +27,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/internal/processor/filterset" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" @@ -40,7 +40,7 @@ func skipTestOnUnsupportedOS(t *testing.T) { } } -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { skipTestOnUnsupportedOS(t) const bootTime = 100 @@ -51,21 +51,21 @@ func TestScrapeMetrics(t *testing.T) { require.NoError(t, err, "Failed to create process scraper: %v", err) err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize process scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - resourceMetrics, err := scraper.ScrapeMetrics(context.Background()) + resourceMetrics, err := scraper.Scrape(context.Background()) // may receive some partial errors as a result of attempting to: // a) read native system processes on Windows (e.g. Registry process) // b) read info on processes that have just terminated // - // so validate that we have less errors than resources & some valid data is returned + // so validate that we have less processes that were failed to be scraped + // than processes that were successfully scraped & some valid data is + // returned if err != nil { - errs := strings.Split(err.Error(), ";") - - noErrors := len(errs) - noResources := resourceMetrics.Len() - require.Lessf(t, noErrors, noResources, "Failed to scrape metrics - more errors returned than metrics: %v", err) + require.True(t, consumererror.IsPartialScrapeError(err)) + noProcessesScraped := resourceMetrics.Len() + noProcessesErrored := err.(consumererror.PartialScrapeError).Failed + require.Lessf(t, noProcessesErrored, noProcessesScraped, "Failed to scrape metrics - more processes failed to be scraped than were successfully scraped: %v", err) } require.Greater(t, resourceMetrics.Len(), 1) @@ -169,11 +169,11 @@ func TestScrapeMetrics_GetProcessesError(t *testing.T) { err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize process scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) assert.EqualError(t, err, "err1") assert.Equal(t, 0, metrics.Len()) + assert.False(t, consumererror.IsPartialScrapeError(err)) } type processHandlesMock struct { @@ -313,7 +313,6 @@ func TestScrapeMetrics_Filtered(t *testing.T) { require.NoError(t, err, "Failed to create process scraper: %v", err) err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize process scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() handles := make([]*processHandleMock, 0, len(test.names)) for _, name := range test.names { @@ -327,7 +326,7 @@ func TestScrapeMetrics_Filtered(t *testing.T) { return &processHandlesMock{handles: handles}, nil } - resourceMetrics, err := scraper.ScrapeMetrics(context.Background()) + resourceMetrics, err := scraper.Scrape(context.Background()) require.NoError(t, err) assert.Equal(t, len(test.expectedNames), resourceMetrics.Len()) @@ -418,7 +417,6 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) { require.NoError(t, err, "Failed to create process scraper: %v", err) err = scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize process scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() username := "username" if test.usernameError != nil { @@ -439,31 +437,48 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) { return &processHandlesMock{handles: []*processHandleMock{handleMock}}, nil } - resourceMetrics, err := scraper.ScrapeMetrics(context.Background()) - assert.EqualError(t, err, test.expectedError) + resourceMetrics, err := scraper.Scrape(context.Background()) + + md := pdata.NewMetrics() + resourceMetrics.MoveAndAppendTo(md.ResourceMetrics()) + expectedResourceMetricsLen, expectedMetricsLen := getExpectedLengthOfReturnedMetrics(test.nameError, test.exeError, test.timesError, test.memoryInfoError, test.ioCountersError) + assert.Equal(t, expectedResourceMetricsLen, md.ResourceMetrics().Len()) + assert.Equal(t, expectedMetricsLen, md.MetricCount()) - if test.nameError != nil || test.exeError != nil { - assert.Equal(t, 0, resourceMetrics.Len()) - } else { - require.Equal(t, 1, resourceMetrics.Len()) - metrics := getMetricSlice(t, resourceMetrics.At(0)) - expectedLen := getExpectedLengthOfReturnedMetrics(test.timesError, test.memoryInfoError, test.ioCountersError) - assert.Equal(t, expectedLen, metrics.Len()) + assert.EqualError(t, err, test.expectedError) + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + expectedFailures := getExpectedScrapeFailures(test.nameError, test.exeError, test.timesError, test.memoryInfoError, test.ioCountersError) + assert.Equal(t, expectedFailures, err.(consumererror.PartialScrapeError).Failed) } }) } } -func getExpectedLengthOfReturnedMetrics(timeError, memError, diskError error) int { +func getExpectedLengthOfReturnedMetrics(nameError, exeError, timeError, memError, diskError error) (int, int) { + if nameError != nil || exeError != nil { + return 0, 0 + } + expectedLen := 0 if timeError == nil { - expectedLen++ + expectedLen += cpuMetricsLen } if memError == nil { - expectedLen += 2 + expectedLen += memoryMetricsLen } if diskError == nil { - expectedLen++ + expectedLen += diskMetricsLen } - return expectedLen + return 1, expectedLen +} + +func getExpectedScrapeFailures(nameError, exeError, timeError, memError, diskError error) int { + expectedResourceMetricsLen, expectedMetricsLen := getExpectedLengthOfReturnedMetrics(nameError, exeError, timeError, memError, diskError) + if expectedResourceMetricsLen == 0 { + return 1 + } + + return metricsLen - expectedMetricsLen } diff --git a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/factory.go b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/factory.go index fb304e736bc..436575a7686 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/factory.go +++ b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/factory.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" - "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/scraper/obsreportscraper" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) // This file implements Factory for Swap scraper. @@ -44,7 +44,15 @@ func (f *Factory) CreateMetricsScraper( ctx context.Context, _ *zap.Logger, config internal.Config, -) (internal.Scraper, error) { +) (receiverhelper.MetricsScraper, error) { cfg := config.(*Config) - return obsreportscraper.WrapScraper(newSwapScraper(ctx, cfg), TypeStr), nil + s := newSwapScraper(ctx, cfg) + + ms := receiverhelper.NewMetricsScraper( + TypeStr, + s.Scrape, + receiverhelper.WithInitialize(s.Initialize), + ) + + return ms, nil } diff --git a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others.go b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others.go index 8106df9603d..89b84bbd305 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others.go +++ b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others.go @@ -23,9 +23,15 @@ import ( "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/mem" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" + "go.opentelemetry.io/collector/receiver/receiverhelper" +) + +const ( + swapUsageMetricsLen = 1 + pagingMetricsLen = 2 ) // scraper for Swap Metrics @@ -55,13 +61,8 @@ func (s *scraper) Initialize(_ context.Context) error { return nil } -// Close -func (s *scraper) Close(_ context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(_ context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() var errors []error @@ -76,18 +77,18 @@ func (s *scraper) ScrapeMetrics(_ context.Context) (pdata.MetricSlice, error) { errors = append(errors, err) } - return metrics, componenterror.CombineErrors(errors) + return metrics, receiverhelper.CombineScrapeErrors(errors) } func (s *scraper) scrapeAndAppendSwapUsageMetric(metrics pdata.MetricSlice) error { now := internal.TimeToUnixNano(time.Now()) vmem, err := s.virtualMemory() if err != nil { - return err + return consumererror.NewPartialScrapeError(err, swapUsageMetricsLen) } idx := metrics.Len() - metrics.Resize(idx + 1) + metrics.Resize(idx + swapUsageMetricsLen) initializeSwapUsageMetric(metrics.At(idx), now, vmem) return nil } @@ -113,11 +114,11 @@ func (s *scraper) scrapeAndAppendPagingMetrics(metrics pdata.MetricSlice) error now := internal.TimeToUnixNano(time.Now()) swap, err := s.swapMemory() if err != nil { - return err + return consumererror.NewPartialScrapeError(err, pagingMetricsLen) } idx := metrics.Len() - metrics.Resize(idx + 2) + metrics.Resize(idx + pagingMetricsLen) initializePagingMetric(metrics.At(idx+0), s.startTime, now, swap) initializePageFaultsMetric(metrics.At(idx+1), s.startTime, now, swap) return nil diff --git a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others_test.go b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others_test.go index 99c2420d185..6b0e8930238 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_others_test.go @@ -24,14 +24,17 @@ import ( "github.com/shirou/gopsutil/mem" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer/consumererror" ) -func TestScrapeMetrics_Errors(t *testing.T) { +func TestScrape_Errors(t *testing.T) { type testCase struct { name string virtualMemoryFunc func() (*mem.VirtualMemoryStat, error) swapMemoryFunc func() (*mem.SwapMemoryStat, error) expectedError string + expectedErrCount int } testCases := []testCase{ @@ -39,17 +42,20 @@ func TestScrapeMetrics_Errors(t *testing.T) { name: "virtualMemoryError", virtualMemoryFunc: func() (*mem.VirtualMemoryStat, error) { return nil, errors.New("err1") }, expectedError: "err1", + expectedErrCount: swapUsageMetricsLen, }, { - name: "swapMemoryError", - swapMemoryFunc: func() (*mem.SwapMemoryStat, error) { return nil, errors.New("err2") }, - expectedError: "err2", + name: "swapMemoryError", + swapMemoryFunc: func() (*mem.SwapMemoryStat, error) { return nil, errors.New("err2") }, + expectedError: "err2", + expectedErrCount: pagingMetricsLen, }, { name: "multipleErrors", virtualMemoryFunc: func() (*mem.VirtualMemoryStat, error) { return nil, errors.New("err1") }, swapMemoryFunc: func() (*mem.SwapMemoryStat, error) { return nil, errors.New("err2") }, expectedError: "[err1; err2]", + expectedErrCount: swapUsageMetricsLen + pagingMetricsLen, }, } @@ -65,10 +71,15 @@ func TestScrapeMetrics_Errors(t *testing.T) { err := scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize swap scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - _, err = scraper.ScrapeMetrics(context.Background()) + _, err = scraper.Scrape(context.Background()) assert.EqualError(t, err, test.expectedError) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, test.expectedErrCount, err.(consumererror.PartialScrapeError).Failed) + } }) } } diff --git a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_test.go b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_test.go index fcd5330d9f6..b223dd31bd5 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_test.go @@ -27,7 +27,7 @@ import ( "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" ) -func TestScrapeMetrics(t *testing.T) { +func TestScrape(t *testing.T) { type testCase struct { name string bootTimeFunc func() (uint64, error) @@ -64,9 +64,8 @@ func TestScrapeMetrics(t *testing.T) { return } require.NoError(t, err, "Failed to initialize swap scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) require.NoError(t, err) // expect 3 metrics (windows does not currently support page_faults metric) diff --git a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows.go b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows.go index bd320cc971f..f75013ea08f 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows.go +++ b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows.go @@ -23,13 +23,17 @@ import ( "github.com/shirou/gopsutil/host" - "go.opentelemetry.io/collector/component/componenterror" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) const ( + swapUsageMetricsLen = 1 + pagingMetricsLen = 1 + memory = "Memory" pageReadsPerSec = "Page Reads/sec" @@ -74,13 +78,8 @@ func (s *scraper) Initialize(_ context.Context) error { return s.perfCounterScraper.Initialize(memory) } -// Close -func (s *scraper) Close(context.Context) error { - return nil -} - -// ScrapeMetrics -func (s *scraper) ScrapeMetrics(context.Context) (pdata.MetricSlice, error) { +// Scrape +func (s *scraper) Scrape(context.Context) (pdata.MetricSlice, error) { metrics := pdata.NewMetricSlice() var errors []error @@ -95,18 +94,18 @@ func (s *scraper) ScrapeMetrics(context.Context) (pdata.MetricSlice, error) { errors = append(errors, err) } - return metrics, componenterror.CombineErrors(errors) + return metrics, receiverhelper.CombineScrapeErrors(errors) } func (s *scraper) scrapeAndAppendSwapUsageMetric(metrics pdata.MetricSlice) error { now := internal.TimeToUnixNano(time.Now()) pageFiles, err := s.pageFileStats() if err != nil { - return err + return consumererror.NewPartialScrapeError(err, swapUsageMetricsLen) } idx := metrics.Len() - metrics.Resize(idx + 1) + metrics.Resize(idx + swapUsageMetricsLen) s.initializeSwapUsageMetric(metrics.At(idx), now, pageFiles) return nil } @@ -138,22 +137,22 @@ func (s *scraper) scrapeAndAppendPagingMetric(metrics pdata.MetricSlice) error { counters, err := s.perfCounterScraper.Scrape() if err != nil { - return err + return consumererror.NewPartialScrapeError(err, pagingMetricsLen) } memoryObject, err := counters.GetObject(memory) if err != nil { - return err + return consumererror.NewPartialScrapeError(err, pagingMetricsLen) } memoryCounterValues, err := memoryObject.GetValues(pageReadsPerSec, pageWritesPerSec) if err != nil { - return err + return consumererror.NewPartialScrapeError(err, pagingMetricsLen) } if len(memoryCounterValues) > 0 { idx := metrics.Len() - metrics.Resize(idx + 1) + metrics.Resize(idx + pagingMetricsLen) initializePagingMetric(metrics.At(idx), s.startTime, now, memoryCounterValues[0]) } diff --git a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows_test.go b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows_test.go index 34acdea3573..565d9c56b52 100644 --- a/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows_test.go +++ b/receiver/hostmetricsreceiver/internal/scraper/swapscraper/swap_scraper_windows_test.go @@ -24,10 +24,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver/internal/perfcounters" ) -func TestScrapeMetrics_Errors(t *testing.T) { +func TestScrape_Errors(t *testing.T) { type testCase struct { name string pageSize uint64 @@ -36,6 +37,7 @@ func TestScrapeMetrics_Errors(t *testing.T) { getObjectErr error getValuesErr error expectedErr string + expectedErrCount int expectedUsedValue int64 expectedFreeValue int64 } @@ -57,27 +59,32 @@ func TestScrapeMetrics_Errors(t *testing.T) { name: "pageFileError", getPageFileStats: func() ([]*pageFileData, error) { return nil, errors.New("err1") }, expectedErr: "err1", + expectedErrCount: swapUsageMetricsLen, }, { - name: "scrapeError", - scrapeErr: errors.New("err1"), - expectedErr: "err1", + name: "scrapeError", + scrapeErr: errors.New("err1"), + expectedErr: "err1", + expectedErrCount: pagingMetricsLen, }, { - name: "getObjectErr", - getObjectErr: errors.New("err1"), - expectedErr: "err1", + name: "getObjectErr", + getObjectErr: errors.New("err1"), + expectedErr: "err1", + expectedErrCount: pagingMetricsLen, }, { - name: "getValuesErr", - getValuesErr: errors.New("err1"), - expectedErr: "err1", + name: "getValuesErr", + getValuesErr: errors.New("err1"), + expectedErr: "err1", + expectedErrCount: pagingMetricsLen, }, { name: "multipleErrors", getPageFileStats: func() ([]*pageFileData, error) { return nil, errors.New("err1") }, getObjectErr: errors.New("err2"), expectedErr: "[err1; err2]", + expectedErrCount: swapUsageMetricsLen + pagingMetricsLen, }, } @@ -97,11 +104,17 @@ func TestScrapeMetrics_Errors(t *testing.T) { err := scraper.Initialize(context.Background()) require.NoError(t, err, "Failed to initialize swap scraper: %v", err) - defer func() { assert.NoError(t, scraper.Close(context.Background())) }() - metrics, err := scraper.ScrapeMetrics(context.Background()) + metrics, err := scraper.Scrape(context.Background()) if test.expectedErr != "" { assert.EqualError(t, err, test.expectedErr) + + isPartial := consumererror.IsPartialScrapeError(err) + assert.True(t, isPartial) + if isPartial { + assert.Equal(t, test.expectedErrCount, err.(consumererror.PartialScrapeError).Failed) + } + return } diff --git a/receiver/hostmetricsreceiver/internal/utils.go b/receiver/hostmetricsreceiver/internal/utils.go index 7c7b5141c1b..a4d9cd29d1f 100644 --- a/receiver/hostmetricsreceiver/internal/utils.go +++ b/receiver/hostmetricsreceiver/internal/utils.go @@ -20,17 +20,6 @@ import ( "go.opentelemetry.io/collector/consumer/pdata" ) -// Initializes a metric with a metric slice and returns it. -func InitializeMetricSlice(metricData pdata.Metrics) pdata.MetricSlice { - rms := metricData.ResourceMetrics() - rms.Resize(1) - rm := rms.At(0) - ilms := rm.InstrumentationLibraryMetrics() - ilms.Resize(1) - ilm := ilms.At(0) - return ilm.Metrics() -} - func TimeToUnixNano(t time.Time) pdata.TimestampUnixNano { return pdata.TimestampUnixNano(uint64(t.UnixNano())) } diff --git a/receiver/receiverhelper/errors.go b/receiver/receiverhelper/errors.go index 3305dd3ec57..1a364150dce 100644 --- a/receiver/receiverhelper/errors.go +++ b/receiver/receiverhelper/errors.go @@ -48,6 +48,12 @@ func CombineScrapeErrors(errs []error) error { errMsgs = append(errMsgs, err.Error()) } - err := fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) + var err error + if len(errs) == 1 { + err = errs[0] + } else { + err = fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) + } + return consumererror.NewPartialScrapeError(err, failedScrapeCount) }