diff --git a/processor/builder.go b/processor/builder.go new file mode 100644 index 00000000000..a1a0378b37c --- /dev/null +++ b/processor/builder.go @@ -0,0 +1,97 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package processor // import "go.opentelemetry.io/collector/processor" + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors. +type Builder struct { + cfgs map[component.ID]component.Config + factories map[component.Type]Factory +} + +// NewBuilder creates a new processor.Builder to help with creating components form a set of configs and factories. +func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { + return &Builder{cfgs: cfgs, factories: factories} +} + +// CreateTraces creates a Traces processor based on the settings and config. +func (b *Builder) CreateTraces(ctx context.Context, set Settings, next consumer.Traces) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.TracesProcessorStability()) + return f.CreateTracesProcessor(ctx, set, cfg, next) +} + +// CreateMetrics creates a Metrics processor based on the settings and config. +func (b *Builder) CreateMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.MetricsProcessorStability()) + return f.CreateMetricsProcessor(ctx, set, cfg, next) +} + +// CreateLogs creates a Logs processor based on the settings and config. +func (b *Builder) CreateLogs(ctx context.Context, set Settings, next consumer.Logs) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } + cfg, existsCfg := b.cfgs[set.ID] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", set.ID) + } + + f, existsFactory := b.factories[set.ID.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for: %q", set.ID) + } + + logStabilityLevel(set.Logger, f.LogsProcessorStability()) + return f.CreateLogsProcessor(ctx, set, cfg, next) +} + +func (b *Builder) Factory(componentType component.Type) component.Factory { + return b.factories[componentType] +} + +// logStabilityLevel logs the stability level of a component. The log level is set to info for +// undefined, unmaintained, deprecated and development. The log level is set to debug +// for alpha, beta and stable. +func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { + if sl >= component.StabilityLevelAlpha { + logger.Debug(sl.LogMessage()) + } else { + logger.Info(sl.LogMessage()) + } +} diff --git a/processor/internal/factory.go b/processor/internal/factory.go new file mode 100644 index 00000000000..293e65a5ae2 --- /dev/null +++ b/processor/internal/factory.go @@ -0,0 +1,172 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/processor/internal" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Factory is a Factory interface for processors. +// +// This interface cannot be directly implemented. Implementations must +// use the NewProcessorFactory to implement it. +type Factory interface { + component.Factory + + // CreateTracesProcessor creates a TracesProcessor based on this config. + // If the processor type does not support tracing or if the config is not valid, + // an error will be returned instead. + CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) + + // TracesProcessorStability gets the stability level of the TracesProcessor. + TracesProcessorStability() component.StabilityLevel + + // CreateMetricsProcessor creates a MetricsProcessor based on this config. + // If the processor type does not support metrics or if the config is not valid, + // an error will be returned instead. + CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) + + // MetricsProcessorStability gets the stability level of the MetricsProcessor. + MetricsProcessorStability() component.StabilityLevel + + // CreateLogsProcessor creates a LogsProcessor based on the config. + // If the processor type does not support logs or if the config is not valid, + // an error will be returned instead. + CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) + + // LogsProcessorStability gets the stability level of the LogsProcessor. + LogsProcessorStability() component.StabilityLevel + + unexportedFactoryFunc() +} + +// FactoryOption apply changes to Options. +type FactoryOption interface { + // applyProcessorFactoryOption applies the option. + applyProcessorFactoryOption(o *factory) +} + +var _ FactoryOption = (*factoryOptionFunc)(nil) + +// factoryOptionFunc is a FactoryOption created through a function. +type factoryOptionFunc func(*factory) + +func (f factoryOptionFunc) applyProcessorFactoryOption(o *factory) { + f(o) +} + +// CreateTracesFunc is the equivalent of Factory.CreateTraces(). +type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) + +// CreateTracesProcessor implements Factory.CreateTracesProcessor(). +func (f CreateTracesFunc) CreateTracesProcessor( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Traces) (Traces, error) { + if f == nil { + return nil, component.ErrDataTypeIsNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateMetricsFunc is the equivalent of Factory.CreateMetrics(). +type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) + +// CreateMetricsProcessor implements Factory.CreateMetricsProcessor(). +func (f CreateMetricsFunc) CreateMetricsProcessor( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Metrics, +) (Metrics, error) { + if f == nil { + return nil, component.ErrDataTypeIsNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} + +// CreateLogsFunc is the equivalent of Factory.CreateLogs(). +type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) + +// CreateLogsProcessor implements Factory.CreateLogsProcessor(). +func (f CreateLogsFunc) CreateLogsProcessor( + ctx context.Context, + set Settings, + cfg component.Config, + nextConsumer consumer.Logs, +) (Logs, error) { + if f == nil { + return nil, component.ErrDataTypeIsNotSupported + } + return f(ctx, set, cfg, nextConsumer) +} + +type factory struct { + cfgType component.Type + component.CreateDefaultConfigFunc + CreateTracesFunc + tracesStabilityLevel component.StabilityLevel + CreateMetricsFunc + metricsStabilityLevel component.StabilityLevel + CreateLogsFunc + logsStabilityLevel component.StabilityLevel +} + +func (f *factory) Type() component.Type { + return f.cfgType +} + +func (f *factory) unexportedFactoryFunc() {} + +func (f factory) TracesProcessorStability() component.StabilityLevel { + return f.tracesStabilityLevel +} + +func (f factory) MetricsProcessorStability() component.StabilityLevel { + return f.metricsStabilityLevel +} + +func (f factory) LogsProcessorStability() component.StabilityLevel { + return f.logsStabilityLevel +} + +// WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level. +func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.tracesStabilityLevel = sl + o.CreateTracesFunc = createTraces + }) +} + +// WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level. +func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.metricsStabilityLevel = sl + o.CreateMetricsFunc = createMetrics + }) +} + +// WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level. +func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption { + return factoryOptionFunc(func(o *factory) { + o.logsStabilityLevel = sl + o.CreateLogsFunc = createLogs + }) +} + +// NewFactory returns a Factory. +func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { + f := &factory{ + cfgType: cfgType, + CreateDefaultConfigFunc: createDefaultConfig, + } + for _, opt := range options { + opt.applyProcessorFactoryOption(f) + } + return f +} diff --git a/processor/internal/logs.go b/processor/internal/logs.go new file mode 100644 index 00000000000..defced909ad --- /dev/null +++ b/processor/internal/logs.go @@ -0,0 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/processor/internal" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Logs is a processor that can consume logs. +type Logs interface { + component.Component + consumer.Logs +} diff --git a/processor/internal/metrics.go b/processor/internal/metrics.go new file mode 100644 index 00000000000..b1265e43b1a --- /dev/null +++ b/processor/internal/metrics.go @@ -0,0 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/processor/internal" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Metrics is a processor that can consume metrics. +type Metrics interface { + component.Component + consumer.Metrics +} diff --git a/processor/internal/processor.go b/processor/internal/processor.go new file mode 100644 index 00000000000..4e7a07a18b8 --- /dev/null +++ b/processor/internal/processor.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/processor/internal" + +import "go.opentelemetry.io/collector/component" + +// Settings is passed to Create* functions in Factory. +type Settings struct { + // ID returns the ID of the component that will be created. + ID component.ID + + component.TelemetrySettings + + // BuildInfo can be used by components for informational purposes + BuildInfo component.BuildInfo +} diff --git a/processor/internal/traces.go b/processor/internal/traces.go new file mode 100644 index 00000000000..a1fe0ced3cb --- /dev/null +++ b/processor/internal/traces.go @@ -0,0 +1,15 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/processor/internal" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" +) + +// Traces is a processor that can consume traces. +type Traces interface { + component.Component + consumer.Traces +} diff --git a/processor/processor.go b/processor/processor.go index 4618fa006fe..8d17038d106 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -4,14 +4,11 @@ package processor // import "go.opentelemetry.io/collector/processor" import ( - "context" "errors" "fmt" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/processor/internal" ) var ( @@ -19,198 +16,58 @@ var ( ) // Traces is a processor that can consume traces. -type Traces interface { - component.Component - consumer.Traces -} +type Traces = internal.Traces // Metrics is a processor that can consume metrics. -type Metrics interface { - component.Component - consumer.Metrics -} +type Metrics = internal.Metrics // Logs is a processor that can consume logs. -type Logs interface { - component.Component - consumer.Logs -} +type Logs = internal.Logs // CreateSettings is passed to Create* functions in Factory. // // Deprecated: [v0.103.0] Use processor.Settings instead. -type CreateSettings = Settings +type CreateSettings = internal.Settings // Settings is passed to Create* functions in Factory. -type Settings struct { - // ID returns the ID of the component that will be created. - ID component.ID - - component.TelemetrySettings - - // BuildInfo can be used by components for informational purposes - BuildInfo component.BuildInfo -} +type Settings = internal.Settings // Factory is Factory interface for processors. // // This interface cannot be directly implemented. Implementations must // use the NewProcessorFactory to implement it. -type Factory interface { - component.Factory - - // CreateTracesProcessor creates a TracesProcessor based on this config. - // If the processor type does not support tracing or if the config is not valid, - // an error will be returned instead. - CreateTracesProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) - - // TracesProcessorStability gets the stability level of the TracesProcessor. - TracesProcessorStability() component.StabilityLevel - - // CreateMetricsProcessor creates a MetricsProcessor based on this config. - // If the processor type does not support metrics or if the config is not valid, - // an error will be returned instead. - CreateMetricsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) - - // MetricsProcessorStability gets the stability level of the MetricsProcessor. - MetricsProcessorStability() component.StabilityLevel - - // CreateLogsProcessor creates a LogsProcessor based on the config. - // If the processor type does not support logs or if the config is not valid, - // an error will be returned instead. - CreateLogsProcessor(ctx context.Context, set Settings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) - - // LogsProcessorStability gets the stability level of the LogsProcessor. - LogsProcessorStability() component.StabilityLevel - - unexportedFactoryFunc() -} +type Factory = internal.Factory // FactoryOption apply changes to Options. -type FactoryOption interface { - // applyProcessorFactoryOption applies the option. - applyProcessorFactoryOption(o *factory) -} - -var _ FactoryOption = (*factoryOptionFunc)(nil) - -// factoryOptionFunc is a FactoryOption created through a function. -type factoryOptionFunc func(*factory) - -func (f factoryOptionFunc) applyProcessorFactoryOption(o *factory) { - f(o) -} +type FactoryOption = internal.FactoryOption // CreateTracesFunc is the equivalent of Factory.CreateTraces(). -type CreateTracesFunc func(context.Context, Settings, component.Config, consumer.Traces) (Traces, error) - -// CreateTracesProcessor implements Factory.CreateTracesProcessor(). -func (f CreateTracesFunc) CreateTracesProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Traces) (Traces, error) { - if f == nil { - return nil, component.ErrDataTypeIsNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateTracesFunc = internal.CreateTracesFunc // CreateMetricsFunc is the equivalent of Factory.CreateMetrics(). -type CreateMetricsFunc func(context.Context, Settings, component.Config, consumer.Metrics) (Metrics, error) - -// CreateMetricsProcessor implements Factory.CreateMetricsProcessor(). -func (f CreateMetricsFunc) CreateMetricsProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Metrics, -) (Metrics, error) { - if f == nil { - return nil, component.ErrDataTypeIsNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} +type CreateMetricsFunc = internal.CreateMetricsFunc // CreateLogsFunc is the equivalent of Factory.CreateLogs(). -type CreateLogsFunc func(context.Context, Settings, component.Config, consumer.Logs) (Logs, error) - -// CreateLogsProcessor implements Factory.CreateLogsProcessor(). -func (f CreateLogsFunc) CreateLogsProcessor( - ctx context.Context, - set Settings, - cfg component.Config, - nextConsumer consumer.Logs, -) (Logs, error) { - if f == nil { - return nil, component.ErrDataTypeIsNotSupported - } - return f(ctx, set, cfg, nextConsumer) -} - -type factory struct { - cfgType component.Type - component.CreateDefaultConfigFunc - CreateTracesFunc - tracesStabilityLevel component.StabilityLevel - CreateMetricsFunc - metricsStabilityLevel component.StabilityLevel - CreateLogsFunc - logsStabilityLevel component.StabilityLevel -} - -func (f *factory) Type() component.Type { - return f.cfgType -} - -func (f *factory) unexportedFactoryFunc() {} - -func (f factory) TracesProcessorStability() component.StabilityLevel { - return f.tracesStabilityLevel -} - -func (f factory) MetricsProcessorStability() component.StabilityLevel { - return f.metricsStabilityLevel -} - -func (f factory) LogsProcessorStability() component.StabilityLevel { - return f.logsStabilityLevel -} +type CreateLogsFunc = internal.CreateLogsFunc // WithTraces overrides the default "error not supported" implementation for CreateTraces and the default "undefined" stability level. func WithTraces(createTraces CreateTracesFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.tracesStabilityLevel = sl - o.CreateTracesFunc = createTraces - }) + return internal.WithTraces(createTraces, sl) } // WithMetrics overrides the default "error not supported" implementation for CreateMetrics and the default "undefined" stability level. func WithMetrics(createMetrics CreateMetricsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.metricsStabilityLevel = sl - o.CreateMetricsFunc = createMetrics - }) + return internal.WithMetrics(createMetrics, sl) } // WithLogs overrides the default "error not supported" implementation for CreateLogs and the default "undefined" stability level. func WithLogs(createLogs CreateLogsFunc, sl component.StabilityLevel) FactoryOption { - return factoryOptionFunc(func(o *factory) { - o.logsStabilityLevel = sl - o.CreateLogsFunc = createLogs - }) + return internal.WithLogs(createLogs, sl) } // NewFactory returns a Factory. func NewFactory(cfgType component.Type, createDefaultConfig component.CreateDefaultConfigFunc, options ...FactoryOption) Factory { - f := &factory{ - cfgType: cfgType, - CreateDefaultConfigFunc: createDefaultConfig, - } - for _, opt := range options { - opt.applyProcessorFactoryOption(f) - } - return f + return internal.NewFactory(cfgType, createDefaultConfig, options...) } // MakeFactoryMap takes a list of factories and returns a map with Factory type as keys. @@ -225,86 +82,3 @@ func MakeFactoryMap(factories ...Factory) (map[component.Type]Factory, error) { } return fMap, nil } - -// Builder processor is a helper struct that given a set of Configs and Factories helps with creating processors. -type Builder struct { - cfgs map[component.ID]component.Config - factories map[component.Type]Factory -} - -// NewBuilder creates a new processor.Builder to help with creating components form a set of configs and factories. -func NewBuilder(cfgs map[component.ID]component.Config, factories map[component.Type]Factory) *Builder { - return &Builder{cfgs: cfgs, factories: factories} -} - -// CreateTraces creates a Traces processor based on the settings and config. -func (b *Builder) CreateTraces(ctx context.Context, set Settings, next consumer.Traces) (Traces, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("processor factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.TracesProcessorStability()) - return f.CreateTracesProcessor(ctx, set, cfg, next) -} - -// CreateMetrics creates a Metrics processor based on the settings and config. -func (b *Builder) CreateMetrics(ctx context.Context, set Settings, next consumer.Metrics) (Metrics, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("processor factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.MetricsProcessorStability()) - return f.CreateMetricsProcessor(ctx, set, cfg, next) -} - -// CreateLogs creates a Logs processor based on the settings and config. -func (b *Builder) CreateLogs(ctx context.Context, set Settings, next consumer.Logs) (Logs, error) { - if next == nil { - return nil, errNilNextConsumer - } - cfg, existsCfg := b.cfgs[set.ID] - if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", set.ID) - } - - f, existsFactory := b.factories[set.ID.Type()] - if !existsFactory { - return nil, fmt.Errorf("processor factory not available for: %q", set.ID) - } - - logStabilityLevel(set.Logger, f.LogsProcessorStability()) - return f.CreateLogsProcessor(ctx, set, cfg, next) -} - -func (b *Builder) Factory(componentType component.Type) component.Factory { - return b.factories[componentType] -} - -// logStabilityLevel logs the stability level of a component. The log level is set to info for -// undefined, unmaintained, deprecated and development. The log level is set to debug -// for alpha, beta and stable. -func logStabilityLevel(logger *zap.Logger, sl component.StabilityLevel) { - if sl >= component.StabilityLevelAlpha { - logger.Debug(sl.LogMessage()) - } else { - logger.Info(sl.LogMessage()) - } -}