diff --git a/receiver/internal/factory.go b/receiver/internal/factory.go deleted file mode 100644 index b62a0c6ff6f..00000000000 --- a/receiver/internal/factory.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/receiver/internal" - -import ( - "context" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/consumerprofiles" - "go.opentelemetry.io/collector/pipeline" -) - -// Factory is a factory interface for receivers. -// -// This interface cannot be directly implemented. Implementations must -// use the NewReceiverFactory to implement it. -type Factory interface { - component.Factory - - // CreateTracesReceiver creates a TracesReceiver based on this config. - // If the receiver type does not support traces, - // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateTracesReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) - - // TracesReceiverStability gets the stability level of the TracesReceiver. - TracesReceiverStability() component.StabilityLevel - - // CreateMetricsReceiver creates a MetricsReceiver based on this config. - // If the receiver type does not support metrics, - // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateMetricsReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) - - // MetricsReceiverStability gets the stability level of the MetricsReceiver. - MetricsReceiverStability() component.StabilityLevel - - // CreateLogsReceiver creates a LogsReceiver based on this config. - // If the receiver type does not support logs, - // this function returns the error [pipeline.ErrSignalNotSupported]. - // Implementers can assume `nextConsumer` is never nil. - CreateLogsReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) - - // LogsReceiverStability gets the stability level of the LogsReceiver. - LogsReceiverStability() component.StabilityLevel - - // CreateProfilesReceiver creates a ProfilesReceiver based on this config. - // If the receiver type does not support tracing or if the config is not valid - // an error will be returned instead. `nextConsumer` is never nil. - CreateProfilesReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumerprofiles.Profiles) (Profiles, error) - - // ProfilesReceiverStability gets the stability level of the ProfilesReceiver. - ProfilesReceiverStability() component.StabilityLevel - - unexportedFactoryFunc() -} - -// FactoryOption apply changes to ReceiverOptions. -type FactoryOption interface { - // applyOption applies the option. - applyOption(o *factory) -} - -// factoryOptionFunc is an ReceiverFactoryOption created through a function. -type factoryOptionFunc func(*factory) - -func (f factoryOptionFunc) applyOption(o *factory) { - f(o) -} - -// CreateTracesFunc is the equivalent of Factory.CreateTraces. -type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) - -// CreateTracesReceiver implements Factory.CreateTracesReceiver(). -func (f CreateTracesFunc) CreateTracesReceiver( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces) (Traces, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -// CreateMetricsFunc is the equivalent of Factory.CreateMetrics. -type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) - -// CreateMetricsReceiver implements Factory.CreateMetricsReceiver(). -func (f CreateMetricsFunc) CreateMetricsReceiver( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Metrics, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -// CreateLogsFunc is the equivalent of ReceiverFactory.CreateLogsReceiver(). -type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) - -// CreateLogsReceiver implements Factory.CreateLogsReceiver(). -func (f CreateLogsFunc) CreateLogsReceiver( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Logs, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -// CreateProfilesFunc is the equivalent of Factory.CreateProfiles. -type CreateProfilesFunc func(context.Context, Settings, component.Config, consumerprofiles.Profiles) (Profiles, error) - -// CreateProfilesReceiver implements Factory.CreateProfilesReceiver(). -func (f CreateProfilesFunc) CreateProfilesReceiver( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumerprofiles.Profiles) (Profiles, error) { - if f == nil { - return nil, pipeline.ErrSignalNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -type factory struct { - cfgType component.Type - component.CreateDefaultConfigFunc - CreateTracesFunc - tracesStabilityLevel component.StabilityLevel - CreateMetricsFunc - metricsStabilityLevel component.StabilityLevel - CreateLogsFunc - logsStabilityLevel component.StabilityLevel - CreateProfilesFunc - profilesStabilityLevel component.StabilityLevel -} - -func (f *factory) Type() component.Type { - return f.cfgType -} - -func (f *factory) unexportedFactoryFunc() {} - -func (f *factory) TracesReceiverStability() component.StabilityLevel { - return f.tracesStabilityLevel -} - -func (f *factory) MetricsReceiverStability() component.StabilityLevel { - return f.metricsStabilityLevel -} - -func (f *factory) LogsReceiverStability() component.StabilityLevel { - return f.logsStabilityLevel -} - -func (f *factory) ProfilesReceiverStability() component.StabilityLevel { - return f.profilesStabilityLevel -} - -// WithTraces overrides the default "error not supported" implementation for CreateTracesReceiver and the default "undefined" stability level. -func WithTraces(createTracesReceiver CreateTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.tracesStabilityLevel = sl - o.CreateTracesFunc = createTracesReceiver - }) -} - -// WithMetrics overrides the default "error not supported" implementation for CreateMetricsReceiver and the default "undefined" stability level. -func WithMetrics(createMetricsReceiver CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metricsStabilityLevel = sl - o.CreateMetricsFunc = createMetricsReceiver - }) -} - -// WithLogs overrides the default "error not supported" implementation for CreateLogsReceiver and the default "undefined" stability level. -func WithLogs(createLogsReceiver CreateLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.logsStabilityLevel = sl - o.CreateLogsFunc = createLogsReceiver - }) -} - -// WithProfiles overrides the default "error not supported" implementation for CreateProfilesReceiver and the default "undefined" stability level. -func WithProfiles(createProfilesReceiver CreateProfilesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.profilesStabilityLevel = sl - o.CreateProfilesFunc = createProfilesReceiver - }) -} - -// NewFactory returns a Factory. -func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - f := &factory{ - cfgType: cfgType, - CreateDefaultConfigFunc: createDefaultConfig, - } - for _, opt := range options { - opt.applyOption(f) - } - return f -} diff --git a/receiver/internal/logs.go b/receiver/internal/logs.go deleted file mode 100644 index e72bb1ad84a..00000000000 --- a/receiver/internal/logs.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/receiver/internal" - -import "go.opentelemetry.io/collector/component" - -// Logs receiver receives logs. -// Its purpose is to translate data from any format to the collector's internal logs data format. -// LogsReceiver feeds a consumer.Logs with data. -// -// For example, it could be a receiver that reads syslogs and convert them into plog.Logs. -type Logs interface { - component.Component -} diff --git a/receiver/internal/metrics.go b/receiver/internal/metrics.go deleted file mode 100644 index 3f5dbd25b0e..00000000000 --- a/receiver/internal/metrics.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/receiver/internal" - -import "go.opentelemetry.io/collector/component" - -// Metrics receiver receives metrics. -// Its purpose is to translate data from any format to the collector's internal metrics format. -// MetricsReceiver feeds a consumer.Metrics with data. -// -// For example, it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics. -type Metrics interface { - component.Component -} diff --git a/receiver/internal/profiles.go b/receiver/internal/profiles.go deleted file mode 100644 index 04c784247f5..00000000000 --- a/receiver/internal/profiles.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/receiver/internal" - -import "go.opentelemetry.io/collector/component" - -// Profiless receiver receives profiles. -// Its purpose is to translate data from any format to the collector's internal profile format. -// ProfilesReceiver feeds a consumer.Profiles with data. -// -// For example, it could be a pprof data source which translates pprof profiles into pprofile.Profiles. -type Profiles interface { - component.Component -} diff --git a/receiver/internal/receiver.go b/receiver/internal/receiver.go deleted file mode 100644 index bbfeaffae35..00000000000 --- a/receiver/internal/receiver.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/receiver/internal" - -import "go.opentelemetry.io/collector/component" - -// Settings configures Receiver creators. -type Settings struct { - // ID returns the ID of the component that will be created. - ID component.ID - - component.TelemetrySettings - - // BuildInfo can be used by components for informational purposes. - BuildInfo component.BuildInfo -} diff --git a/receiver/internal/traces.go b/receiver/internal/traces.go deleted file mode 100644 index 8b1a1b37eda..00000000000 --- a/receiver/internal/traces.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package internal // import "go.opentelemetry.io/collector/receiver/internal" - -import "go.opentelemetry.io/collector/component" - -// Traces receiver receives traces. -// Its purpose is to translate data from any format to the collector's internal trace format. -// TracesReceiver feeds a consumer.Traces with data. -// -// For example, it could be Zipkin data source which translates Zipkin spans into ptrace.Traces. -type Traces interface { - component.Component -} diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 12fc5eb1d20..aebdffafad0 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -27,12 +27,12 @@ const ( // NewFactory creates a new OTLP receiver factory. func NewFactory() receiver.Factory { - return receiver.NewFactory( + return receiverprofiles.NewFactory( metadata.Type, createDefaultConfig, - receiver.WithTraces(createTraces, metadata.TracesStability), - receiver.WithMetrics(createMetrics, metadata.MetricsStability), - receiver.WithLogs(createLog, metadata.LogsStability), + receiverprofiles.WithTraces(createTraces, metadata.TracesStability), + receiverprofiles.WithMetrics(createMetrics, metadata.MetricsStability), + receiverprofiles.WithLogs(createLog, metadata.LogsStability), receiverprofiles.WithProfiles(createProfiles, metadata.ProfilesStability), ) } diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index 301c9e09de9..10afd012da6 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testutil" + "go.opentelemetry.io/collector/receiver/receiverprofiles" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -47,7 +48,7 @@ func TestCreateSameReceiver(t *testing.T) { assert.NotNil(t, lReceiver) require.NoError(t, err) - pReceiver, err := factory.CreateProfilesReceiver(context.Background(), creationSet, cfg, consumertest.NewNop()) + pReceiver, err := factory.(receiverprofiles.Factory).CreateProfilesReceiver(context.Background(), creationSet, cfg, consumertest.NewNop()) assert.NotNil(t, pReceiver) require.NoError(t, err) @@ -415,7 +416,7 @@ func TestCreateProfilesReceiver(t *testing.T) { creationSet := receivertest.NewNopSettings() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tr, err := factory.CreateProfilesReceiver(ctx, creationSet, tt.cfg, tt.sink) + tr, err := factory.(receiverprofiles.Factory).CreateProfilesReceiver(ctx, creationSet, tt.cfg, tt.sink) if tt.wantErr { assert.Error(t, err) return diff --git a/receiver/receiver.go b/receiver/receiver.go index 0f3b5a886fb..010a87735d3 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -4,10 +4,12 @@ package receiver // import "go.opentelemetry.io/collector/receiver" import ( + "context" "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/receiver/internal" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pipeline" ) // Traces receiver receives traces. @@ -15,61 +17,199 @@ import ( // TracesReceiver feeds a consumer.Traces with data. // // For example, it could be Zipkin data source which translates Zipkin spans into ptrace.Traces. -type Traces = internal.Traces +type Traces interface { + component.Component +} // Metrics receiver receives metrics. // Its purpose is to translate data from any format to the collector's internal metrics format. // MetricsReceiver feeds a consumer.Metrics with data. // // For example, it could be Prometheus data source which translates Prometheus metrics into pmetric.Metrics. -type Metrics = internal.Metrics +type Metrics interface { + component.Component +} // Logs receiver receives logs. // Its purpose is to translate data from any format to the collector's internal logs data format. // LogsReceiver feeds a consumer.Logs with data. // // For example, it could be a receiver that reads syslogs and convert them into plog.Logs. -type Logs = internal.Logs +type Logs interface { + component.Component +} // Settings configures Receiver creators. -type Settings = internal.Settings +type Settings struct { + // ID returns the ID of the component that will be created. + ID component.ID + + component.TelemetrySettings + + // BuildInfo can be used by components for informational purposes. + BuildInfo component.BuildInfo +} -// Factory is factory interface for receivers. +// Factory is a factory interface for receivers. // // This interface cannot be directly implemented. Implementations must // use the NewReceiverFactory to implement it. -type Factory = internal.Factory +type Factory interface { + component.Factory + + // CreateTracesReceiver creates a TracesReceiver based on this config. + // If the receiver type does not support traces, + // this function returns the error [pipeline.ErrSignalNotSupported]. + // Implementers can assume `nextConsumer` is never nil. + CreateTracesReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) + + // TracesReceiverStability gets the stability level of the TracesReceiver. + TracesReceiverStability() component.StabilityLevel + + // CreateMetricsReceiver creates a MetricsReceiver based on this config. + // If the receiver type does not support metrics, + // this function returns the error [pipeline.ErrSignalNotSupported]. + // Implementers can assume `nextConsumer` is never nil. + CreateMetricsReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) + + // MetricsReceiverStability gets the stability level of the MetricsReceiver. + MetricsReceiverStability() component.StabilityLevel + + // CreateLogsReceiver creates a LogsReceiver based on this config. + // If the receiver type does not support logs, + // this function returns the error [pipeline.ErrSignalNotSupported]. + // Implementers can assume `nextConsumer` is never nil. + CreateLogsReceiver(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) + + // LogsReceiverStability gets the stability level of the LogsReceiver. + LogsReceiverStability() component.StabilityLevel + + unexportedFactoryFunc() +} // FactoryOption apply changes to ReceiverOptions. -type FactoryOption = internal.FactoryOption +type FactoryOption interface { + // applyOption applies the option. + applyOption(o *factory) +} + +// factoryOptionFunc is an ReceiverFactoryOption created through a function. +type factoryOptionFunc func(*factory) + +func (f factoryOptionFunc) applyOption(o *factory) { + f(o) +} // CreateTracesFunc is the equivalent of Factory.CreateTraces. -type CreateTracesFunc = internal.CreateTracesFunc +type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) + +// CreateTracesReceiver implements Factory.CreateTracesReceiver(). +func (f CreateTracesFunc) CreateTracesReceiver( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Traces) (Traces, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} // CreateMetricsFunc is the equivalent of Factory.CreateMetrics. -type CreateMetricsFunc = internal.CreateMetricsFunc +type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) + +// CreateMetricsReceiver implements Factory.CreateMetricsReceiver(). +func (f CreateMetricsFunc) CreateMetricsReceiver( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (Metrics, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} // CreateLogsFunc is the equivalent of ReceiverFactory.CreateLogsReceiver(). -type CreateLogsFunc = internal.CreateLogsFunc +type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) + +// CreateLogsReceiver implements Factory.CreateLogsReceiver(). +func (f CreateLogsFunc) CreateLogsReceiver( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (Logs, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} + +type factory struct { + cfgType component.Type + component.CreateDefaultConfigFunc + CreateTracesFunc + tracesStabilityLevel component.StabilityLevel + CreateMetricsFunc + metricsStabilityLevel component.StabilityLevel + CreateLogsFunc + logsStabilityLevel component.StabilityLevel +} + +func (f *factory) Type() component.Type { + return f.cfgType +} + +func (f *factory) unexportedFactoryFunc() {} + +func (f *factory) TracesReceiverStability() component.StabilityLevel { + return f.tracesStabilityLevel +} + +func (f *factory) MetricsReceiverStability() component.StabilityLevel { + return f.metricsStabilityLevel +} + +func (f *factory) LogsReceiverStability() component.StabilityLevel { + return f.logsStabilityLevel +} // WithTraces overrides the default "error not supported" implementation for CreateTracesReceiver and the default "undefined" stability level. func WithTraces(createTracesReceiver CreateTracesFunc, sl component.StabilityLevel) FactoryOption { - return internal.WithTraces(createTracesReceiver, sl) + return factoryOptionFunc(func(o *factory) { + o.tracesStabilityLevel = sl + o.CreateTracesFunc = createTracesReceiver + }) } // WithMetrics overrides the default "error not supported" implementation for CreateMetricsReceiver and the default "undefined" stability level. func WithMetrics(createMetricsReceiver CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { - return internal.WithMetrics(createMetricsReceiver, sl) + return factoryOptionFunc(func(o *factory) { + o.metricsStabilityLevel = sl + o.CreateMetricsFunc = createMetricsReceiver + }) } // WithLogs overrides the default "error not supported" implementation for CreateLogsReceiver and the default "undefined" stability level. func WithLogs(createLogsReceiver CreateLogsFunc, sl component.StabilityLevel) FactoryOption { - return internal.WithLogs(createLogsReceiver, sl) + return factoryOptionFunc(func(o *factory) { + o.logsStabilityLevel = sl + o.CreateLogsFunc = createLogsReceiver + }) } // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - return internal.NewFactory(cfgType, createDefaultConfig, options...) + f := &factory{ + cfgType: cfgType, + CreateDefaultConfigFunc: createDefaultConfig, + } + for _, opt := range options { + opt.applyOption(f) + } + return f } // MakeFactoryMap takes a list of receiver factories and returns a map with factory type as keys. diff --git a/receiver/receiverprofiles/go.mod b/receiver/receiverprofiles/go.mod index c2764e59606..e7692665d2f 100644 --- a/receiver/receiverprofiles/go.mod +++ b/receiver/receiverprofiles/go.mod @@ -7,6 +7,7 @@ require ( go.opentelemetry.io/collector/component v0.110.0 go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 go.opentelemetry.io/collector/consumer/consumertest v0.110.0 + go.opentelemetry.io/collector/pipeline v0.110.0 go.opentelemetry.io/collector/receiver v0.110.0 ) @@ -22,7 +23,6 @@ require ( go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect go.opentelemetry.io/collector/pdata v1.16.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect - go.opentelemetry.io/collector/pipeline v0.110.0 // indirect go.opentelemetry.io/otel v1.30.0 // indirect go.opentelemetry.io/otel/metric v1.30.0 // indirect go.opentelemetry.io/otel/trace v1.30.0 // indirect diff --git a/receiver/receiverprofiles/profiles.go b/receiver/receiverprofiles/profiles.go index a98668a04dc..d3c560bb398 100644 --- a/receiver/receiverprofiles/profiles.go +++ b/receiver/receiverprofiles/profiles.go @@ -4,9 +4,12 @@ package receiverprofiles // import "go.opentelemetry.io/collector/receiver/receiverprofiles" import ( + "context" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/internal" ) // Profiles receiver receives profiles. @@ -14,12 +17,113 @@ import ( // ProfilessReceiver feeds a consumerprofiles.Profiles with data. // // For example, it could be a pprof data source which translates pprof profiles into pprofile.Profiles. -type Profiles = internal.Profiles +type Profiles interface { + component.Component +} + +// Factory is a factory interface for receivers. +// +// This interface cannot be directly implemented. Implementations must +// use the NewReceiverFactory to implement it. +type Factory interface { + receiver.Factory + + // CreateProfilesReceiver creates a ProfilesReceiver based on this config. + // If the receiver type does not support tracing or if the config is not valid + // an error will be returned instead. `nextConsumer` is never nil. + CreateProfilesReceiver(ctx context.Context, set receiver.Settings, cfg component.Config, nextConsumer consumerprofiles.Profiles) (Profiles, error) + + // ProfilesReceiverStability gets the stability level of the ProfilesReceiver. + ProfilesReceiverStability() component.StabilityLevel +} // CreateProfilesFunc is the equivalent of Factory.CreateProfiles. -type CreateProfilesFunc = internal.CreateProfilesFunc +type CreateProfilesFunc func(context.Context, receiver.Settings, component.Config, consumerprofiles.Profiles) (Profiles, error) + +// CreateProfilesReceiver implements Factory.CreateProfilesReceiver(). +func (f CreateProfilesFunc) CreateProfilesReceiver( + ctx context.Context, + set receiver.Settings, + cfg component.Config, + nextConsumer consumerprofiles.Profiles) (Profiles, error) { + if f == nil { + return nil, pipeline.ErrSignalNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} + +// FactoryOption apply changes to ReceiverOptions. +type FactoryOption interface { + // applyOption applies the option. + applyOption(o *factoryOpts) +} + +// factoryOptionFunc is an ReceiverFactoryOption created through a function. +type factoryOptionFunc func(*factoryOpts) + +func (f factoryOptionFunc) applyOption(o *factoryOpts) { + f(o) +} + +type factory struct { + receiver.Factory + CreateProfilesFunc + profilesStabilityLevel component.StabilityLevel +} + +func (f *factory) ProfilesReceiverStability() component.StabilityLevel { + return f.profilesStabilityLevel +} + +type factoryOpts struct { + cfgType component.Type + component.CreateDefaultConfigFunc + opts []receiver.FactoryOption + CreateProfilesFunc + profilesStabilityLevel component.StabilityLevel +} + +// WithTraces overrides the default "error not supported" implementation for CreateTracesReceiver and the default "undefined" stability level. +func WithTraces(createTracesReceiver receiver.CreateTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, receiver.WithTraces(createTracesReceiver, sl)) + }) +} + +// WithMetrics overrides the default "error not supported" implementation for CreateMetricsReceiver and the default "undefined" stability level. +func WithMetrics(createMetricsReceiver receiver.CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, receiver.WithMetrics(createMetricsReceiver, sl)) + }) +} + +// WithLogs overrides the default "error not supported" implementation for CreateLogsReceiver and the default "undefined" stability level. +func WithLogs(createLogsReceiver receiver.CreateLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.opts = append(o.opts, receiver.WithLogs(createLogsReceiver, sl)) + }) +} // WithProfiles overrides the default "error not supported" implementation for CreateProfilesReceiver and the default "undefined" stability level. -func WithProfiles(createProfilesReceiver CreateProfilesFunc, sl component.StabilityLevel) receiver.FactoryOption { - return internal.WithProfiles(createProfilesReceiver, sl) +func WithProfiles(createProfilesReceiver CreateProfilesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factoryOpts) { + o.profilesStabilityLevel = sl + o.CreateProfilesFunc = createProfilesReceiver + }) +} + +// NewFactory returns a Factory. +func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + opts := factoryOpts{ + cfgType: cfgType, + CreateDefaultConfigFunc: createDefaultConfig, + } + for _, opt := range options { + opt.applyOption(&opts) + } + return &factory{ + Factory: receiver.NewFactory(opts.cfgType, opts.CreateDefaultConfig, opts.opts...), + CreateProfilesFunc: opts.CreateProfilesFunc, + profilesStabilityLevel: opts.profilesStabilityLevel, + } } diff --git a/receiver/receiverprofiles/receiver_test.go b/receiver/receiverprofiles/receiver_test.go index 2ba69c6ac8b..eb094576756 100644 --- a/receiver/receiverprofiles/receiver_test.go +++ b/receiver/receiverprofiles/receiver_test.go @@ -18,7 +18,7 @@ import ( func TestNewFactoryWithProfiles(t *testing.T) { var testType = component.MustNewType("test") defaultCfg := struct{}{} - factory := receiver.NewFactory( + factory := NewFactory( testType, func() component.Config { return &defaultCfg }, WithProfiles(createProfiles, component.StabilityLevelAlpha), diff --git a/receiver/receivertest/nop_receiver.go b/receiver/receivertest/nop_receiver.go index e9a265377d0..c2225285250 100644 --- a/receiver/receivertest/nop_receiver.go +++ b/receiver/receivertest/nop_receiver.go @@ -31,12 +31,12 @@ func NewNopSettings() receiver.Settings { // NewNopFactory returns a receiver.Factory that constructs nop receivers supporting all data types. func NewNopFactory() receiver.Factory { - return receiver.NewFactory( + return receiverprofiles.NewFactory( defaultComponentType, func() component.Config { return &nopConfig{} }, - receiver.WithTraces(createTraces, component.StabilityLevelStable), - receiver.WithMetrics(createMetrics, component.StabilityLevelStable), - receiver.WithLogs(createLogs, component.StabilityLevelStable), + receiverprofiles.WithTraces(createTraces, component.StabilityLevelStable), + receiverprofiles.WithMetrics(createMetrics, component.StabilityLevelStable), + receiverprofiles.WithLogs(createLogs, component.StabilityLevelStable), receiverprofiles.WithProfiles(createProfiles, component.StabilityLevelAlpha), ) } diff --git a/receiver/receivertest/nop_receiver_test.go b/receiver/receivertest/nop_receiver_test.go index 35e7f0b1b18..51943f92d06 100644 --- a/receiver/receivertest/nop_receiver_test.go +++ b/receiver/receivertest/nop_receiver_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/receiver/receiverprofiles" ) var nopType = component.MustNewType("nop") @@ -39,7 +40,7 @@ func TestNewNopFactory(t *testing.T) { assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, logs.Shutdown(context.Background())) - profiles, err := factory.CreateProfilesReceiver(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profiles, err := factory.(receiverprofiles.Factory).CreateProfilesReceiver(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NoError(t, profiles.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, profiles.Shutdown(context.Background())) diff --git a/service/internal/builders/receiver.go b/service/internal/builders/receiver.go index 6f0caf7045c..5831993d4bb 100644 --- a/service/internal/builders/receiver.go +++ b/service/internal/builders/receiver.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverprofiles" "go.opentelemetry.io/collector/receiver/receivertest" @@ -95,11 +96,16 @@ func (b *ReceiverBuilder) CreateProfiles(ctx context.Context, set receiver.Setti return nil, fmt.Errorf("receiver %q is not configured", set.ID) } - f, existsFactory := b.factories[set.ID.Type()] + recvFact, existsFactory := b.factories[set.ID.Type()] if !existsFactory { return nil, fmt.Errorf("receiver factory not available for: %q", set.ID) } + f, ok := recvFact.(receiverprofiles.Factory) + if !ok { + return nil, pipeline.ErrSignalNotSupported + } + logStabilityLevel(set.Logger, f.ProfilesReceiverStability()) return f.CreateProfilesReceiver(ctx, set, cfg, next) } diff --git a/service/internal/builders/receiver_test.go b/service/internal/builders/receiver_test.go index 488892a1310..2bce4185187 100644 --- a/service/internal/builders/receiver_test.go +++ b/service/internal/builders/receiver_test.go @@ -24,12 +24,12 @@ func TestReceiverBuilder(t *testing.T) { defaultCfg := struct{}{} factories, err := receiver.MakeFactoryMap([]receiver.Factory{ receiver.NewFactory(component.MustNewType("err"), nil), - receiver.NewFactory( + receiverprofiles.NewFactory( component.MustNewType("all"), func() component.Config { return &defaultCfg }, - receiver.WithTraces(createReceiverTraces, component.StabilityLevelDevelopment), - receiver.WithMetrics(createReceiverMetrics, component.StabilityLevelAlpha), - receiver.WithLogs(createReceiverLogs, component.StabilityLevelDeprecated), + receiverprofiles.WithTraces(createReceiverTraces, component.StabilityLevelDevelopment), + receiverprofiles.WithMetrics(createReceiverMetrics, component.StabilityLevelAlpha), + receiverprofiles.WithLogs(createReceiverLogs, component.StabilityLevelDeprecated), receiverprofiles.WithProfiles(createReceiverProfiles, component.StabilityLevelAlpha), ), }...) @@ -136,12 +136,12 @@ func TestReceiverBuilder(t *testing.T) { func TestReceiverBuilderMissingConfig(t *testing.T) { defaultCfg := struct{}{} factories, err := receiver.MakeFactoryMap([]receiver.Factory{ - receiver.NewFactory( + receiverprofiles.NewFactory( component.MustNewType("all"), func() component.Config { return &defaultCfg }, - receiver.WithTraces(createReceiverTraces, component.StabilityLevelDevelopment), - receiver.WithMetrics(createReceiverMetrics, component.StabilityLevelAlpha), - receiver.WithLogs(createReceiverLogs, component.StabilityLevelDeprecated), + receiverprofiles.WithTraces(createReceiverTraces, component.StabilityLevelDevelopment), + receiverprofiles.WithMetrics(createReceiverMetrics, component.StabilityLevelAlpha), + receiverprofiles.WithLogs(createReceiverLogs, component.StabilityLevelDeprecated), receiverprofiles.WithProfiles(createReceiverProfiles, component.StabilityLevelAlpha), ), }...) @@ -207,7 +207,7 @@ func TestNewNopReceiverConfigsAndFactories(t *testing.T) { require.NoError(t, err) assert.IsType(t, logs, bLogs) - profiles, err := factory.CreateProfilesReceiver(context.Background(), set, cfg, consumertest.NewNop()) + profiles, err := factory.(receiverprofiles.Factory).CreateProfilesReceiver(context.Background(), set, cfg, consumertest.NewNop()) require.NoError(t, err) bProfiles, err := builder.CreateProfiles(context.Background(), set, consumertest.NewNop()) require.NoError(t, err) diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 58cab24f25a..03849252ffb 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -3038,15 +3038,15 @@ func newBadConnectorFactory() connector.Factory { } func newErrReceiverFactory() receiver.Factory { - return receiver.NewFactory(component.MustNewType("err"), + return receiverprofiles.NewFactory(component.MustNewType("err"), func() component.Config { return &struct{}{} }, - receiver.WithTraces(func(context.Context, receiver.Settings, component.Config, consumer.Traces) (receiver.Traces, error) { + receiverprofiles.WithTraces(func(context.Context, receiver.Settings, component.Config, consumer.Traces) (receiver.Traces, error) { return &errComponent{}, nil }, component.StabilityLevelUndefined), - receiver.WithLogs(func(context.Context, receiver.Settings, component.Config, consumer.Logs) (receiver.Logs, error) { + receiverprofiles.WithLogs(func(context.Context, receiver.Settings, component.Config, consumer.Logs) (receiver.Logs, error) { return &errComponent{}, nil }, component.StabilityLevelUndefined), - receiver.WithMetrics(func(context.Context, receiver.Settings, component.Config, consumer.Metrics) (receiver.Metrics, error) { + receiverprofiles.WithMetrics(func(context.Context, receiver.Settings, component.Config, consumer.Metrics) (receiver.Metrics, error) { return &errComponent{}, nil }, component.StabilityLevelUndefined), receiverprofiles.WithProfiles(func(context.Context, receiver.Settings, component.Config, consumerprofiles.Profiles) (receiverprofiles.Profiles, error) { diff --git a/service/internal/testcomponents/example_receiver.go b/service/internal/testcomponents/example_receiver.go index 2309c9837ec..698df6cc208 100644 --- a/service/internal/testcomponents/example_receiver.go +++ b/service/internal/testcomponents/example_receiver.go @@ -16,12 +16,12 @@ import ( var receiverType = component.MustNewType("examplereceiver") // ExampleReceiverFactory is factory for ExampleReceiver. -var ExampleReceiverFactory = receiver.NewFactory( +var ExampleReceiverFactory = receiverprofiles.NewFactory( receiverType, createReceiverDefaultConfig, - receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), - receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment), - receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), + receiverprofiles.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), + receiverprofiles.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment), + receiverprofiles.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), receiverprofiles.WithProfiles(createProfilesReceiver, component.StabilityLevelDevelopment), )