From 1a51c9438115e18f8ba070fe6a1648c1f6e382aa Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Wed, 19 Jun 2019 15:54:42 -0400 Subject: [PATCH 1/2] Build receivers based on new configuration - Build receivers and plug them into pipelines. - Added tests to verify that single pipeline and multiple pipeline (fan out) per receiver work correctly. - Disable -v flag in go test to reduce unnecessary noise in output, we have too many tests to use -v flag. Errors are still properly printed. This makes finding the cause of test failures easier. - Add logging to various application startup steps. - Changed logging from %q to %s in multiple places to make it more human readable when output in json format. Testing done: 1. make && all unit tests pass. 2. make otelsvc and produce otelsvc_linux executable. 3. Manually verify that otelsvc_linux runs and correctly forwards trace data received via jaeger receiver and exported via opencensus exporter. This is the first successful run of unified OpenTelemetry Service. --- Makefile | 2 +- .../app/builder/exporters_builder.go | 14 +- .../app/builder/pipelines_builder.go | 28 +- .../app/builder/receivers_builder.go | 273 ++++++++++++++++++ .../app/builder/receivers_builder_test.go | 257 +++++++++++++++++ .../builder/testdata/pipelines_builder.yaml | 25 +- cmd/occollector/app/collector/collector.go | 40 ++- internal/configmodels/configmodels.go | 39 ++- internal/configv2/configv2.go | 2 + internal/configv2/configv2_test.go | 8 + internal/configv2/example_factories.go | 74 ++++- receiver/jaegerreceiver/config.go | 22 ++ receiver/jaegerreceiver/config_test.go | 2 + receiver/jaegerreceiver/factory.go | 2 + receiver/opencensusreceiver/config_test.go | 2 + receiver/opencensusreceiver/factory.go | 2 + 16 files changed, 753 insertions(+), 39 deletions(-) create mode 100644 cmd/occollector/app/builder/receivers_builder.go create mode 100644 cmd/occollector/app/builder/receivers_builder_test.go diff --git a/Makefile b/Makefile index cff72c288e7..3fff7bc773e 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ ALL_SRC := $(shell find . -name '*.go' \ # ALL_PKGS is used with 'go cover' ALL_PKGS := $(shell go list $(sort $(dir $(ALL_SRC)))) -GOTEST_OPT?=-v -race -timeout 30s +GOTEST_OPT?= -race -timeout 30s GOTEST_OPT_WITH_COVERAGE = $(GOTEST_OPT) -coverprofile=coverage.txt -covermode=atomic GOTEST=go test GOFMT=gofmt diff --git a/cmd/occollector/app/builder/exporters_builder.go b/cmd/occollector/app/builder/exporters_builder.go index ee7d65b7e36..49f12718390 100644 --- a/cmd/occollector/app/builder/exporters_builder.go +++ b/cmd/occollector/app/builder/exporters_builder.go @@ -174,7 +174,7 @@ func (eb *ExportersBuilder) buildExporter( // Could not create because this exporter does not support this data type. return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.TracesDataType) } - return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err) + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) } exporter.tc = tc @@ -189,13 +189,15 @@ func (eb *ExportersBuilder) buildExporter( // Could not create because this exporter does not support this data type. return nil, typeMismatchErr(config, requirement.requiredBy, configmodels.MetricsDataType) } - return nil, fmt.Errorf("error creating %q exporter: %v", config.Name(), err) + return nil, fmt.Errorf("error creating %s exporter: %v", config.Name(), err) } exporter.mc = mc exporter.stop = combineStopFunc(exporter.stop, stopFunc) } + eb.logger.Info(fmt.Sprintf("Exporter %s is enabled.", config.Name())) + return exporter, nil } @@ -205,9 +207,9 @@ func typeMismatchErr( dataType configmodels.DataType, ) error { return fmt.Errorf( - "pipeline %q produces %q to exporter %s which does not support %q "+ - "telemetry data. exporter will be detached from pipeline", - requiredByPipeline.Name, dataType.GetDataTypeStr(), - config.Name(), dataType.GetDataTypeStr(), + "pipeline %s is attached %s to exporter %s which does not support %s "+ + "telemetry data produced by pipeline. Exporter will be detached from pipeline", + requiredByPipeline.Name, dataType.GetString(), + config.Name(), dataType.GetString(), ) } diff --git a/cmd/occollector/app/builder/pipelines_builder.go b/cmd/occollector/app/builder/pipelines_builder.go index 7245daf2ba1..191a587a967 100644 --- a/cmd/occollector/app/builder/pipelines_builder.go +++ b/cmd/occollector/app/builder/pipelines_builder.go @@ -54,11 +54,11 @@ func NewPipelinesBuilder( } // Build pipeline processors from config. -func (eb *PipelinesBuilder) Build() (PipelineProcessors, error) { +func (pb *PipelinesBuilder) Build() (PipelineProcessors, error) { pipelineProcessors := make(PipelineProcessors) - for _, pipeline := range eb.config.Pipelines { - firstProcessor, err := eb.buildPipeline(pipeline) + for _, pipeline := range pb.config.Pipelines { + firstProcessor, err := pb.buildPipeline(pipeline) if err != nil { return nil, err } @@ -71,7 +71,7 @@ func (eb *PipelinesBuilder) Build() (PipelineProcessors, error) { // Builds a pipeline of processors. Returns the first processor in the pipeline. // The last processor in the pipeline will be plugged to fan out the data into exporters // that are configured for this pipeline. -func (eb *PipelinesBuilder) buildPipeline( +func (pb *PipelinesBuilder) buildPipeline( pipelineCfg *configmodels.Pipeline, ) (*builtProcessor, error) { @@ -83,9 +83,9 @@ func (eb *PipelinesBuilder) buildPipeline( switch pipelineCfg.InputType { case configmodels.TracesDataType: - tc = eb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters) + tc = pb.buildFanoutExportersTraceConsumer(pipelineCfg.Exporters) case configmodels.MetricsDataType: - mc = eb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters) + mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters) } // Now build the processors backwards, starting from the last one. @@ -94,7 +94,7 @@ func (eb *PipelinesBuilder) buildPipeline( // in the pipeline and so on. for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- { procName := pipelineCfg.Processors[i] - procCfg := eb.config.Processors[procName] + procCfg := pb.config.Processors[procName] factory := factories.GetProcessorFactory(procCfg.Type()) @@ -115,22 +115,24 @@ func (eb *PipelinesBuilder) buildPipeline( } } + pb.logger.Info(fmt.Sprintf("Pipeline %s enabled.", pipelineCfg.Name)) + return &builtProcessor{tc, mc}, nil } // Converts the list of exporter names to a list of corresponding builtExporters. -func (eb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter { +func (pb *PipelinesBuilder) getBuiltExportersByNames(exporterNames []string) []*builtExporter { var result []*builtExporter for _, name := range exporterNames { - exporter := eb.exporters[eb.config.Exporters[name]] + exporter := pb.exporters[pb.config.Exporters[name]] result = append(result, exporter) } return result } -func (eb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer { - builtExporters := eb.getBuiltExportersByNames(exporterNames) +func (pb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []string) consumer.TraceConsumer { + builtExporters := pb.getBuiltExportersByNames(exporterNames) // Optimize for the case when there is only one exporter, no need to create junction point. if len(builtExporters) == 1 { @@ -146,8 +148,8 @@ func (eb *PipelinesBuilder) buildFanoutExportersTraceConsumer(exporterNames []st return multiconsumer.NewTraceProcessor(exporters) } -func (eb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer { - builtExporters := eb.getBuiltExportersByNames(exporterNames) +func (pb *PipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterNames []string) consumer.MetricsConsumer { + builtExporters := pb.getBuiltExportersByNames(exporterNames) // Optimize for the case when there is only one exporter, no need to create junction point. if len(builtExporters) == 1 { diff --git a/cmd/occollector/app/builder/receivers_builder.go b/cmd/occollector/app/builder/receivers_builder.go new file mode 100644 index 00000000000..2ccb4f01a4a --- /dev/null +++ b/cmd/occollector/app/builder/receivers_builder.go @@ -0,0 +1,273 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-service/consumer" + "github.com/open-telemetry/opentelemetry-service/internal" + "github.com/open-telemetry/opentelemetry-service/internal/configmodels" + "github.com/open-telemetry/opentelemetry-service/internal/factories" + "github.com/open-telemetry/opentelemetry-service/processor/multiconsumer" + "github.com/open-telemetry/opentelemetry-service/receiver" +) + +// builtReceiver is a receiver that is built based on a config. It can have +// a trace and/or a metrics component. +type builtReceiver struct { + trace receiver.TraceReceiver + metrics receiver.MetricsReceiver +} + +// Stop the receiver. +func (rcv *builtReceiver) Stop() error { + var errors []error + if rcv.trace != nil { + err := rcv.trace.StopTraceReception(context.Background()) + if err != nil { + errors = append(errors, err) + } + } + + if rcv.metrics != nil { + err := rcv.metrics.StopMetricsReception(context.Background()) + if err != nil { + errors = append(errors, err) + } + } + + return internal.CombineErrors(errors) +} + +// Start the receiver. +func (rcv *builtReceiver) Start(asyncErrorChan chan<- error) error { + var errors []error + if rcv.trace != nil { + err := rcv.trace.StartTraceReception(context.Background(), asyncErrorChan) + if err != nil { + errors = append(errors, err) + } + } + + if rcv.metrics != nil { + err := rcv.metrics.StartMetricsReception(context.Background(), asyncErrorChan) + if err != nil { + errors = append(errors, err) + } + } + + return internal.CombineErrors(errors) +} + +// Receivers is a map of receivers created from receiver configs. +type Receivers map[configmodels.Receiver]*builtReceiver + +// StopAll stops all receivers. +func (rcvs Receivers) StopAll() { + for _, rcv := range rcvs { + rcv.Stop() + } +} + +// StartAll starts all receivers. +func (rcvs Receivers) StartAll(logger *zap.Logger, asyncErrorChan chan<- error) error { + for cfg, rcv := range rcvs { + logger.Info(fmt.Sprintf("Receiver %s starting...", cfg.Name())) + if err := rcv.Start(asyncErrorChan); err != nil { + return err + } + logger.Info(fmt.Sprintf("Receiver %s started.", cfg.Name())) + } + return nil +} + +// ReceiversBuilder builds receivers from config. +type ReceiversBuilder struct { + logger *zap.Logger + config *configmodels.ConfigV2 + pipelineProcessors PipelineProcessors +} + +// NewReceiversBuilder creates a new ReceiversBuilder. Call Build() on the returned value. +func NewReceiversBuilder( + logger *zap.Logger, + config *configmodels.ConfigV2, + pipelineProcessors PipelineProcessors, +) *ReceiversBuilder { + return &ReceiversBuilder{logger, config, pipelineProcessors} +} + +// Build receivers from config. +func (rb *ReceiversBuilder) Build() (Receivers, error) { + receivers := make(Receivers) + + // Build receivers based on configuration. + for _, cfg := range rb.config.Receivers { + rcv, err := rb.buildReceiver(cfg) + if err != nil { + return nil, err + } + receivers[cfg] = rcv + } + + return receivers, nil +} + +// hasReceiver returns true if the pipeline is attached to specified receiver. +func hasReceiver(pipeline *configmodels.Pipeline, receiverName string) bool { + for _, name := range pipeline.Receivers { + if name == receiverName { + return true + } + } + return false +} + +type attachedPipelines map[configmodels.DataType][]*builtProcessor + +func (rb *ReceiversBuilder) findPipelinesToAttach(config configmodels.Receiver) (attachedPipelines, error) { + // A receiver may be attached to multiple pipelines. Pipelines may consume different + // data types. We need to compile the list of pipelines of each type that must be + // attached to this receiver according to configuration. + + pipelinesToAttach := make(attachedPipelines) + pipelinesToAttach[configmodels.TracesDataType] = make([]*builtProcessor, 0) + pipelinesToAttach[configmodels.MetricsDataType] = make([]*builtProcessor, 0) + + // Iterate over all pipelines. + for _, pipelineCfg := range rb.config.Pipelines { + // Get the first processor of the pipeline. + pipelineProcessor := rb.pipelineProcessors[pipelineCfg] + if pipelineProcessor == nil { + return nil, fmt.Errorf("cannot find pipeline processor for pipeline %s", + pipelineCfg.Name) + } + + // Is this receiver attached to the pipeline? + if hasReceiver(pipelineCfg, config.Name()) { + // Yes, add it to the list of pipelines of corresponding data type. + pipelinesToAttach[pipelineCfg.InputType] = + append(pipelinesToAttach[pipelineCfg.InputType], pipelineProcessor) + } + } + + return pipelinesToAttach, nil +} + +func (rb *ReceiversBuilder) attachReceiverToPipelines( + factory factories.ReceiverFactory, + dataType configmodels.DataType, + config configmodels.Receiver, + receiver *builtReceiver, + pipelineProcessors []*builtProcessor, +) error { + // There are pipelines of the specified data type that must be attached to + // the receiver. Create the receiver of corresponding data type and make + // sure its output is fanned out to all attached pipelines. + var err error + switch dataType { + case configmodels.TracesDataType: + // First, create the fan out junction point. + junction := buildFanoutTraceConsumer(pipelineProcessors) + + // Now create the receiver and tell it to send to the junction point. + receiver.trace, err = factory.CreateTraceReceiver(context.Background(), config, junction) + + case configmodels.MetricsDataType: + junction := buildFanoutMetricConsumer(pipelineProcessors) + receiver.metrics, err = factory.CreateMetricsReceiver(config, junction) + } + + if err != nil { + if err == factories.ErrDataTypeIsNotSupported { + return fmt.Errorf( + "receiver %s does not support %s but some pipelines that "+ + "want to process %s are attached to the receiever", + config.Name(), + dataType.GetString(), + dataType.GetString()) + } + return fmt.Errorf("cannot create receiver %s", config.Name()) + } + + rb.logger.Info(fmt.Sprintf("Receiver %s is enabled for %s.", + config.Name(), dataType.GetString())) + + return nil +} + +func (rb *ReceiversBuilder) buildReceiver(config configmodels.Receiver) (*builtReceiver, error) { + + // First find pipelines that must be attached to this receiver. + pipelinesToAttach, err := rb.findPipelinesToAttach(config) + if err != nil { + return nil, err + } + + // Prepare to build the receiver. + factory := factories.GetReceiverFactory(config.Type()) + receiver := &builtReceiver{} + + // Now we have list of pipelines broken down by data type. Iterate for each data type. + for dataType, pipelines := range pipelinesToAttach { + if len(pipelines) == 0 { + // No pipelines of this data type are attached to this receiver. + continue + } + + // Attach the corresponding part of the receiver to all pipelines that require + // this data type. + err := rb.attachReceiverToPipelines(factory, dataType, config, receiver, pipelines) + if err != nil { + return nil, err + } + } + + return receiver, nil +} + +func buildFanoutTraceConsumer(pipelineFrontProcessors []*builtProcessor) consumer.TraceConsumer { + // Optimize for the case when there is only one processor, no need to create junction point. + if len(pipelineFrontProcessors) == 1 { + return pipelineFrontProcessors[0].tc + } + + var pipelineConsumers []consumer.TraceConsumer + for _, builtProc := range pipelineFrontProcessors { + pipelineConsumers = append(pipelineConsumers, builtProc.tc) + } + + // Create a junction point that fans out to all pipelines. + return multiconsumer.NewTraceProcessor(pipelineConsumers) +} + +func buildFanoutMetricConsumer(pipelineFrontProcessors []*builtProcessor) consumer.MetricsConsumer { + // Optimize for the case when there is only one processor, no need to create junction point. + if len(pipelineFrontProcessors) == 1 { + return pipelineFrontProcessors[0].mc + } + + var pipelineConsumers []consumer.MetricsConsumer + for _, builtProc := range pipelineFrontProcessors { + pipelineConsumers = append(pipelineConsumers, builtProc.mc) + } + + // Create a junction point that fans out to all pipelines. + return multiconsumer.NewMetricsProcessor(pipelineConsumers) +} diff --git a/cmd/occollector/app/builder/receivers_builder_test.go b/cmd/occollector/app/builder/receivers_builder_test.go new file mode 100644 index 00000000000..4f3103c5c0a --- /dev/null +++ b/cmd/occollector/app/builder/receivers_builder_test.go @@ -0,0 +1,257 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "context" + "testing" + + "go.uber.org/zap" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + + "github.com/open-telemetry/opentelemetry-service/data" + "github.com/open-telemetry/opentelemetry-service/internal/configmodels" + "github.com/open-telemetry/opentelemetry-service/internal/configv2" +) + +type testCase struct { + name string + receiverName string + exporterNames []string + spanDuplicationByExporter map[string]int + hasTraces bool + hasMetrics bool +} + +func TestReceiversBuilder_Build(t *testing.T) { + tests := []testCase{ + { + name: "one-exporter", + receiverName: "examplereceiver", + exporterNames: []string{"exampleexporter"}, + hasTraces: true, + hasMetrics: true, + }, + { + name: "multi-exporter", + receiverName: "examplereceiver/2", + exporterNames: []string{"exampleexporter", "exampleexporter/2"}, + hasTraces: true, + }, + { + name: "multi-metrics-receiver", + receiverName: "examplereceiver/3", + exporterNames: []string{"exampleexporter", "exampleexporter/2"}, + hasTraces: false, + hasMetrics: true, + }, + { + name: "multi-receiver-multi-exporter", + receiverName: "examplereceiver/multi", + exporterNames: []string{"exampleexporter", "exampleexporter/2"}, + + // Check pipelines_builder.yaml to understand this case. + // We have 2 pipelines, one exporting to one exporter, the other + // exporting to both exporters, so we expect a duplication on + // one of the exporters, but not on the other. + spanDuplicationByExporter: map[string]int{ + "exampleexporter": 2, "exampleexporter/2": 1, + }, + hasTraces: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testReceivers(t, test) + }) + } +} + +func testReceivers( + t *testing.T, + test testCase, +) { + // Load the config + config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml") + require.Nil(t, err) + + // Build the pipeline + allExporters, err := NewExportersBuilder(zap.NewNop(), config).Build() + pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), config, allExporters).Build() + receivers, err := NewReceiversBuilder(zap.NewNop(), config, pipelineProcessors).Build() + + assert.NoError(t, err) + require.NotNil(t, receivers) + + receiver := receivers[config.Receivers[test.receiverName]] + + // Ensure receiver has its fields correctly populated. + require.NotNil(t, receiver) + + if test.hasTraces { + assert.NotNil(t, receiver.trace) + } else { + assert.Nil(t, receiver.trace) + } + + if test.hasMetrics { + assert.NotNil(t, receiver.metrics) + } else { + assert.Nil(t, receiver.metrics) + } + + // Compose the list of created exporters. + var exporters []*builtExporter + for _, name := range test.exporterNames { + // Ensure exporter is created. + exp := allExporters[config.Exporters[name]] + require.NotNil(t, exp) + exporters = append(exporters, exp) + } + + // Send TraceData via receiver and verify that all exporters of the pipeline receive it. + + // First check that there are no traces in the exporters yet. + for _, exporter := range exporters { + consumer := exporter.tc.(*configv2.ExampleExporterConsumer) + require.Equal(t, len(consumer.Traces), 0) + require.Equal(t, len(consumer.Metrics), 0) + } + + // Send one trace. + name := tracepb.TruncatableString{Value: "testspanname"} + traceData := data.TraceData{ + SourceFormat: "test-source-format", + Spans: []*tracepb.Span{ + {Name: &name}, + }, + } + if test.hasTraces { + traceProducer := receiver.trace.(*configv2.ExampleReceiverProducer) + traceProducer.TraceConsumer.ConsumeTraceData(context.Background(), traceData) + } + + metricsData := data.MetricsData{ + Metrics: []*metricspb.Metric{ + {MetricDescriptor: &metricspb.MetricDescriptor{Name: "testmetric"}}, + }, + } + if test.hasMetrics { + metricsProducer := receiver.metrics.(*configv2.ExampleReceiverProducer) + metricsProducer.MetricsConsumer.ConsumeMetricsData(context.Background(), metricsData) + } + + // Now verify received data. + for _, name := range test.exporterNames { + // Check that the data is received by exporter. + exporter := allExporters[config.Exporters[name]] + + // Validate traces. + if test.hasTraces { + var spanDuplicationCount int + if test.spanDuplicationByExporter != nil { + spanDuplicationCount = test.spanDuplicationByExporter[name] + } else { + spanDuplicationCount = 1 + } + + traceConsumer := exporter.tc.(*configv2.ExampleExporterConsumer) + require.Equal(t, spanDuplicationCount, len(traceConsumer.Traces)) + + for i := 0; i < spanDuplicationCount; i++ { + assert.Equal(t, traceData, traceConsumer.Traces[i]) + + // Check that the span was processed by "attributes" processor and an + // attribute was added. + assert.Equal(t, int64(12345), + traceConsumer.Traces[i].Spans[0].Attributes.AttributeMap["attr1"].GetIntValue()) + } + } + + // Validate metrics. + if test.hasMetrics { + metricsConsumer := exporter.mc.(*configv2.ExampleExporterConsumer) + require.Equal(t, 1, len(metricsConsumer.Metrics)) + assert.Equal(t, metricsData, metricsConsumer.Metrics[0]) + } + } +} + +func TestReceiversBuilder_Error(t *testing.T) { + //config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml") + //require.Nil(t, err) + // + //// Corrupt the pipeline, change data type to metrics. We have to forcedly do it here + //// since there is no way to have such config loaded by LoadConfigFile, it would not + //// pass validation. We are doing this to test failure mode of PipelinesBuilder. + //pipeline := config.Pipelines["traces"] + //pipeline.InputType = configmodels.MetricsDataType + // + //exporters, err := NewExportersBuilder(zap.NewNop(), config).Build() + // + //// This should fail because "attributes" processor defined in the config does + //// not support metrics data type. + //_, err = NewPipelinesBuilder(zap.NewNop(), config, exporters).Build() + // + //assert.NotNil(t, err) +} + +func TestReceiversBuilder_StartAll(t *testing.T) { + receivers := make(Receivers) + rcvCfg := &configmodels.ReceiverSettings{} + + receiver := &configv2.ExampleReceiverProducer{} + + receivers[rcvCfg] = &builtReceiver{ + trace: receiver, + metrics: receiver, + } + + assert.Equal(t, false, receiver.TraceStarted) + assert.Equal(t, false, receiver.MetricsStarted) + + ch := make(chan error) + err := receivers.StartAll(zap.NewNop(), ch) + assert.Nil(t, err) + + assert.Equal(t, true, receiver.TraceStarted) + assert.Equal(t, true, receiver.MetricsStarted) +} + +func TestReceiversBuilder_StopAll(t *testing.T) { + receivers := make(Receivers) + rcvCfg := &configmodels.ReceiverSettings{} + + receiver := &configv2.ExampleReceiverProducer{} + + receivers[rcvCfg] = &builtReceiver{ + trace: receiver, + metrics: receiver, + } + + assert.Equal(t, false, receiver.TraceStopped) + assert.Equal(t, false, receiver.MetricsStopped) + + receivers.StopAll() + + assert.Equal(t, true, receiver.TraceStopped) + assert.Equal(t, true, receiver.MetricsStopped) +} diff --git a/cmd/occollector/app/builder/testdata/pipelines_builder.yaml b/cmd/occollector/app/builder/testdata/pipelines_builder.yaml index f85a630fb9a..98e181dc43c 100644 --- a/cmd/occollector/app/builder/testdata/pipelines_builder.yaml +++ b/cmd/occollector/app/builder/testdata/pipelines_builder.yaml @@ -2,6 +2,15 @@ receivers: examplereceiver: enabled: true + examplereceiver/2: + enabled: true + + examplereceiver/3: + enabled: true + + examplereceiver/multi: + enabled: true + processors: attributes: values: @@ -16,11 +25,23 @@ exporters: pipelines: traces: - receivers: [examplereceiver] + receivers: [examplereceiver, examplereceiver/multi] processors: [attributes] exporters: [exampleexporter] traces/2: - receivers: [examplereceiver] + receivers: [examplereceiver/2, examplereceiver/multi] processors: [attributes] exporters: [exampleexporter, exampleexporter/2] + + metrics: + receivers: [examplereceiver] + exporters: [exampleexporter] + + metrics/2: + receivers: [examplereceiver/3] + exporters: [exampleexporter] + + metrics/3: + receivers: [examplereceiver/3] + exporters: [exampleexporter/2] diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go index 612886fdcb6..ce03bae31c8 100644 --- a/cmd/occollector/app/collector/collector.go +++ b/cmd/occollector/app/collector/collector.go @@ -44,12 +44,13 @@ var ( // Application represents a collector application type Application struct { - v *viper.Viper - logger *zap.Logger - healthCheck *healthcheck.HealthCheck - processor consumer.TraceConsumer - receivers []receiver.TraceReceiver - exporters builder.Exporters + v *viper.Viper + logger *zap.Logger + healthCheck *healthcheck.HealthCheck + processor consumer.TraceConsumer + receivers []receiver.TraceReceiver + exporters builder.Exporters + builtReceivers builder.Receivers // stopTestChan is used to terminate the application in end to end tests. stopTestChan chan struct{} @@ -89,6 +90,7 @@ func (app *Application) init() { } func (app *Application) setupPProf() { + app.logger.Info("Setting up profiler...") err := pprofserver.SetupFromViper(app.asyncErrorChannel, app.v, app.logger) if err != nil { log.Fatalf("Failed to start net/http/pprof: %v", err) @@ -96,6 +98,7 @@ func (app *Application) setupPProf() { } func (app *Application) setupHealthCheck() { + app.logger.Info("Setting up health checks...") var err error app.healthCheck, err = newHealthCheck(app.v, app.logger) if err != nil { @@ -104,6 +107,7 @@ func (app *Application) setupHealthCheck() { } func (app *Application) setupZPages() { + app.logger.Info("Setting up zPages...") zpagesPort := app.v.GetInt(zpagesserver.ZPagesHTTPPort) if zpagesPort > 0 { closeZPages, err := zpagesserver.Run(app.asyncErrorChannel, zpagesPort) @@ -120,6 +124,7 @@ func (app *Application) setupZPages() { } func (app *Application) setupTelemetry() { + app.logger.Info("Setting up own telemetry...") err := AppTelemetry.init(app.asyncErrorChannel, app.v, app.logger) if err != nil { app.logger.Error("Failed to initialize telemetry", zap.Error(err)) @@ -129,6 +134,8 @@ func (app *Application) setupTelemetry() { // runAndWaitForShutdownEvent waits for one of the shutdown events that can happen. func (app *Application) runAndWaitForShutdownEvent() { + app.logger.Info("Everything is ready. Begin running and processing data.") + // Plug SIGTERM signal into a channel. signalsChannel := make(chan os.Signal, 1) signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) @@ -222,12 +229,16 @@ func (app *Application) Start() error { } func (app *Application) setupPipelines() { + app.logger.Info("Loading configuration...") + // Load configuration. config, err := configv2.Load(app.v) if err != nil { log.Fatalf("Cannot load configuration: %v", err) } + app.logger.Info("Applying configuration...") + // Pipeline is built backwards, starting from exporters, so that we create objects // which are referenced before objects which reference them. @@ -239,12 +250,22 @@ func (app *Application) setupPipelines() { // Create pipelines and their processors and plug exporters to the // end of the pipelines. - _, err = builder.NewPipelinesBuilder(app.logger, config, app.exporters).Build() + pipelines, err := builder.NewPipelinesBuilder(app.logger, config, app.exporters).Build() if err != nil { log.Fatalf("Cannot load configuration: %v", err) } - // TODO: create receivers and plug them into the start of the pipelines. + // Create receivers and plug them into the start of the pipelines. + app.builtReceivers, err = builder.NewReceiversBuilder(app.logger, config, pipelines).Build() + if err != nil { + log.Fatalf("Cannot load configuration: %v", err) + } + + app.logger.Info("Starting receivers...") + err = app.builtReceivers.StartAll(app.logger, app.asyncErrorChannel) + if err != nil { + log.Fatalf("Cannot start receivers: %v", err) + } } func (app *Application) shutdownPipelines() { @@ -252,7 +273,8 @@ func (app *Application) shutdownPipelines() { // giving senders a chance to send all their data. This may take time, the allowed // time should be part of configuration. - // TODO: shutdown receivers. + app.logger.Info("Stopping receivers...") + app.builtReceivers.StopAll() // TODO: shutdown processors diff --git a/internal/configmodels/configmodels.go b/internal/configmodels/configmodels.go index fbd975bdf5c..15ca8d47457 100644 --- a/internal/configmodels/configmodels.go +++ b/internal/configmodels/configmodels.go @@ -39,9 +39,18 @@ type ConfigV2 struct { Pipelines Pipelines } +// NamedEntity is a configuration entity that has a name. +type NamedEntity interface { + Name() string + SetName(name string) +} + // Receiver is the configuration of a receiver. Specific receivers must implement this // interface and will typically embed ReceiverSettings struct or a struct that extends it. type Receiver interface { + NamedEntity + Type() string + SetType(typeStr string) } // Receivers is a map of names to Receivers. @@ -49,9 +58,7 @@ type Receivers map[string]Receiver // Exporter is the configuration of an exporter. type Exporter interface { - Name() string - SetName(name string) - + NamedEntity Type() string SetType(typeStr string) } @@ -90,8 +97,8 @@ const ( MetricsDataTypeStr = "metrics" ) -// GetDataTypeStr converts data type to string. -func (dataType DataType) GetDataTypeStr() string { +// GetString converts data type to string. +func (dataType DataType) GetString() string { switch dataType { case TracesDataType: return TracesDataTypeStr @@ -121,10 +128,32 @@ type Pipelines map[string]*Pipeline // ReceiverSettings defines common settings for a single-protocol receiver configuration. // Specific receivers can embed this struct and extend it with more fields if needed. type ReceiverSettings struct { + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` Enabled bool `mapstructure:"enabled"` Endpoint string `mapstructure:"endpoint"` } +// Name gets the exporter name. +func (rs *ReceiverSettings) Name() string { + return rs.NameVal +} + +// SetName sets the exporter name. +func (rs *ReceiverSettings) SetName(name string) { + rs.NameVal = name +} + +// Type sets the receiver type. +func (rs *ReceiverSettings) Type() string { + return rs.TypeVal +} + +// SetType sets the receiver type. +func (rs *ReceiverSettings) SetType(typeStr string) { + rs.TypeVal = typeStr +} + // ExporterSettings defines common settings for an exporter configuration. // Specific exporters can embed this struct and extend it with more fields if needed. type ExporterSettings struct { diff --git a/internal/configv2/configv2.go b/internal/configv2/configv2.go index 965721be082..9680bb7fb0a 100644 --- a/internal/configv2/configv2.go +++ b/internal/configv2/configv2.go @@ -191,6 +191,8 @@ func loadReceivers(v *viper.Viper) (configmodels.Receivers, error) { // Create the default config for this receiver. receiverCfg := factory.CreateDefaultConfig() + receiverCfg.SetType(typeStr) + receiverCfg.SetName(fullName) // Now that the default config struct is created we can Unmarshal into it // and it will apply user-defined config on top of the default. diff --git a/internal/configv2/configv2_test.go b/internal/configv2/configv2_test.go index 39c045b3fb0..e2e6b3cac39 100644 --- a/internal/configv2/configv2_test.go +++ b/internal/configv2/configv2_test.go @@ -39,6 +39,8 @@ func TestDecodeConfig(t *testing.T) { assert.Equal(t, config.Receivers["examplereceiver"], &ExampleReceiver{ ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "examplereceiver", + NameVal: "examplereceiver", Endpoint: "localhost:1000", Enabled: false, }, @@ -48,6 +50,8 @@ func TestDecodeConfig(t *testing.T) { assert.Equal(t, config.Receivers["examplereceiver/myreceiver"], &ExampleReceiver{ ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "examplereceiver", + NameVal: "examplereceiver/myreceiver", Endpoint: "127.0.0.1:12345", Enabled: true, }, @@ -115,6 +119,8 @@ func TestDecodeConfig_MultiProto(t *testing.T) { assert.Equal(t, config.Receivers["multireceiver"], &MultiProtoReceiver{ + TypeVal: "multireceiver", + NameVal: "multireceiver", Protocols: map[string]MultiProtoReceiverOneCfg{ "http": { Enabled: false, @@ -131,6 +137,8 @@ func TestDecodeConfig_MultiProto(t *testing.T) { assert.Equal(t, config.Receivers["multireceiver/myreceiver"], &MultiProtoReceiver{ + TypeVal: "multireceiver", + NameVal: "multireceiver/myreceiver", Protocols: map[string]MultiProtoReceiverOneCfg{ "http": { Enabled: true, diff --git a/internal/configv2/example_factories.go b/internal/configv2/example_factories.go index 2a18b8b6eb4..3b94dc9aaa5 100644 --- a/internal/configv2/example_factories.go +++ b/internal/configv2/example_factories.go @@ -50,6 +50,7 @@ func (f *ExampleReceiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler func (f *ExampleReceiverFactory) CreateDefaultConfig() configmodels.Receiver { return &ExampleReceiver{ ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: "examplereceiver", Endpoint: "localhost:1000", Enabled: false, }, @@ -64,26 +65,92 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver( nextConsumer consumer.TraceConsumer, ) (receiver.TraceReceiver, error) { // Not used for this test, just return nil - return nil, nil + return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil } // CreateMetricsReceiver creates a metrics receiver based on this config. func (f *ExampleReceiverFactory) CreateMetricsReceiver( cfg configmodels.Receiver, - consumer consumer.MetricsConsumer, + nextConsumer consumer.MetricsConsumer, ) (receiver.MetricsReceiver, error) { // Not used for this test, just return nil - return nil, nil + return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil +} + +// ExampleReceiverProducer allows producing traces and metrics for testing purposes. +type ExampleReceiverProducer struct { + TraceConsumer consumer.TraceConsumer + TraceStarted bool + TraceStopped bool + MetricsConsumer consumer.MetricsConsumer + MetricsStarted bool + MetricsStopped bool +} + +// TraceSource returns the name of the trace data source. +func (erp *ExampleReceiverProducer) TraceSource() string { + return "" +} + +// StartTraceReception tells the receiver to start its processing. +func (erp *ExampleReceiverProducer) StartTraceReception(ctx context.Context, asyncErrorChannel chan<- error) error { + erp.TraceStarted = true + return nil +} + +// StopTraceReception tells the receiver that should stop reception, +func (erp *ExampleReceiverProducer) StopTraceReception(ctx context.Context) error { + erp.TraceStopped = true + return nil +} + +// MetricsSource returns the name of the metrics data source. +func (erp *ExampleReceiverProducer) MetricsSource() string { + return "" +} + +// StartMetricsReception tells the receiver to start its processing. +func (erp *ExampleReceiverProducer) StartMetricsReception(ctx context.Context, asyncErrorChannel chan<- error) error { + erp.MetricsStarted = true + return nil +} + +// StopMetricsReception tells the receiver that should stop reception, +func (erp *ExampleReceiverProducer) StopMetricsReception(ctx context.Context) error { + erp.MetricsStopped = true + return nil } // MultiProtoReceiver is for testing purposes. We are defining an example multi protocol // config and factory for "multireceiver" receiver type. type MultiProtoReceiver struct { + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` Protocols map[string]MultiProtoReceiverOneCfg `mapstructure:"protocols"` } var _ configmodels.Receiver = (*MultiProtoReceiver)(nil) +// Name gets the exporter name. +func (rs *MultiProtoReceiver) Name() string { + return rs.NameVal +} + +// SetName sets the exporter name. +func (rs *MultiProtoReceiver) SetName(name string) { + rs.NameVal = name +} + +// Type sets the receiver type. +func (rs *MultiProtoReceiver) Type() string { + return rs.TypeVal +} + +// SetType sets the receiver type. +func (rs *MultiProtoReceiver) SetType(typeStr string) { + rs.TypeVal = typeStr +} + // MultiProtoReceiverOneCfg is multi proto receiver config. type MultiProtoReceiverOneCfg struct { Enabled bool `mapstructure:"enabled"` @@ -108,6 +175,7 @@ func (f *MultiProtoReceiverFactory) CustomUnmarshaler() factories.CustomUnmarsha // CreateDefaultConfig creates the default configuration for the Receiver. func (f *MultiProtoReceiverFactory) CreateDefaultConfig() configmodels.Receiver { return &MultiProtoReceiver{ + TypeVal: "multireceiver", Protocols: map[string]MultiProtoReceiverOneCfg{ "http": { Enabled: false, diff --git a/receiver/jaegerreceiver/config.go b/receiver/jaegerreceiver/config.go index a9f83fc2572..b51903312a6 100644 --- a/receiver/jaegerreceiver/config.go +++ b/receiver/jaegerreceiver/config.go @@ -20,5 +20,27 @@ import ( // ConfigV2 defines configuration for Jaeger receiver. type ConfigV2 struct { + TypeVal string `mapstructure:"-"` + NameVal string `mapstructure:"-"` Protocols map[string]*configmodels.ReceiverSettings `mapstructure:"protocols"` } + +// Name gets the receiver name. +func (rs *ConfigV2) Name() string { + return rs.NameVal +} + +// SetName sets the receiver name. +func (rs *ConfigV2) SetName(name string) { + rs.NameVal = name +} + +// Type sets the receiver type. +func (rs *ConfigV2) Type() string { + return rs.TypeVal +} + +// SetType sets the receiver type. +func (rs *ConfigV2) SetType(typeStr string) { + rs.TypeVal = typeStr +} diff --git a/receiver/jaegerreceiver/config_test.go b/receiver/jaegerreceiver/config_test.go index 4415492d1f9..89df0621757 100644 --- a/receiver/jaegerreceiver/config_test.go +++ b/receiver/jaegerreceiver/config_test.go @@ -44,6 +44,8 @@ func TestLoadConfig(t *testing.T) { r1 := config.Receivers["jaeger/customname"].(*ConfigV2) assert.Equal(t, r1, &ConfigV2{ + TypeVal: typeStr, + NameVal: "jaeger/customname", Protocols: map[string]*configmodels.ReceiverSettings{ "thrift-http": { Enabled: false, diff --git a/receiver/jaegerreceiver/factory.go b/receiver/jaegerreceiver/factory.go index 9a9783d1a83..973f4bf09e1 100644 --- a/receiver/jaegerreceiver/factory.go +++ b/receiver/jaegerreceiver/factory.go @@ -61,6 +61,8 @@ func (f *receiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler { // CreateDefaultConfig creates the default configuration for Jaeger receiver. func (f *receiverFactory) CreateDefaultConfig() configmodels.Receiver { return &ConfigV2{ + TypeVal: typeStr, + NameVal: typeStr, Protocols: map[string]*configmodels.ReceiverSettings{ protoThriftTChannel: { Enabled: false, diff --git a/receiver/opencensusreceiver/config_test.go b/receiver/opencensusreceiver/config_test.go index 5c08c5ab17b..8486656060d 100644 --- a/receiver/opencensusreceiver/config_test.go +++ b/receiver/opencensusreceiver/config_test.go @@ -45,6 +45,8 @@ func TestLoadConfig(t *testing.T) { r1 := config.Receivers["opencensus/customname"].(*ConfigV2) assert.Equal(t, r1.ReceiverSettings, configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: "opencensus/customname", Endpoint: "0.0.0.0:9090", Enabled: true, }) diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index b963803dbba..6d4e93e9400 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -49,6 +49,8 @@ func (f *receiverFactory) CustomUnmarshaler() factories.CustomUnmarshaler { func (f *receiverFactory) CreateDefaultConfig() configmodels.Receiver { return &ConfigV2{ ReceiverSettings: configmodels.ReceiverSettings{ + TypeVal: typeStr, + NameVal: typeStr, Endpoint: "127.0.0.1:55678", Enabled: true, }, From 320174d3fed67fed4390a3773044eedec3364ae9 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Thu, 20 Jun 2019 08:28:22 -0400 Subject: [PATCH 2/2] Fix based on PR comments --- .../app/builder/exporters_builder.go | 6 ++-- .../app/builder/pipelines_builder.go | 2 +- .../app/builder/receivers_builder.go | 13 +++---- .../app/builder/receivers_builder_test.go | 34 +++++++++---------- internal/configmodels/configmodels.go | 4 +-- internal/configv2/example_factories.go | 14 ++++++-- 6 files changed, 41 insertions(+), 32 deletions(-) diff --git a/cmd/occollector/app/builder/exporters_builder.go b/cmd/occollector/app/builder/exporters_builder.go index 49f12718390..75375524de9 100644 --- a/cmd/occollector/app/builder/exporters_builder.go +++ b/cmd/occollector/app/builder/exporters_builder.go @@ -196,7 +196,7 @@ func (eb *ExportersBuilder) buildExporter( exporter.stop = combineStopFunc(exporter.stop, stopFunc) } - eb.logger.Info(fmt.Sprintf("Exporter %s is enabled.", config.Name())) + eb.logger.Info("Exporter is enabled.", zap.String("exporter", config.Name())) return exporter, nil } @@ -206,9 +206,7 @@ func typeMismatchErr( requiredByPipeline *configmodels.Pipeline, dataType configmodels.DataType, ) error { - return fmt.Errorf( - "pipeline %s is attached %s to exporter %s which does not support %s "+ - "telemetry data produced by pipeline. Exporter will be detached from pipeline", + return fmt.Errorf("%s is a %s pipeline but has a %s which does not support %s", requiredByPipeline.Name, dataType.GetString(), config.Name(), dataType.GetString(), ) diff --git a/cmd/occollector/app/builder/pipelines_builder.go b/cmd/occollector/app/builder/pipelines_builder.go index 191a587a967..45ee803f261 100644 --- a/cmd/occollector/app/builder/pipelines_builder.go +++ b/cmd/occollector/app/builder/pipelines_builder.go @@ -115,7 +115,7 @@ func (pb *PipelinesBuilder) buildPipeline( } } - pb.logger.Info(fmt.Sprintf("Pipeline %s enabled.", pipelineCfg.Name)) + pb.logger.Info("Pipeline is enabled.", zap.String("pipelines", pipelineCfg.Name)) return &builtProcessor{tc, mc}, nil } diff --git a/cmd/occollector/app/builder/receivers_builder.go b/cmd/occollector/app/builder/receivers_builder.go index 2ccb4f01a4a..c4482f17d9b 100644 --- a/cmd/occollector/app/builder/receivers_builder.go +++ b/cmd/occollector/app/builder/receivers_builder.go @@ -88,11 +88,12 @@ func (rcvs Receivers) StopAll() { // StartAll starts all receivers. func (rcvs Receivers) StartAll(logger *zap.Logger, asyncErrorChan chan<- error) error { for cfg, rcv := range rcvs { - logger.Info(fmt.Sprintf("Receiver %s starting...", cfg.Name())) + logger.Info("Receiver is starting...", zap.String("receiver", cfg.Name())) + if err := rcv.Start(asyncErrorChan); err != nil { return err } - logger.Info(fmt.Sprintf("Receiver %s started.", cfg.Name())) + logger.Info("Receiver is started.", zap.String("receiver", cfg.Name())) } return nil } @@ -197,8 +198,8 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( if err != nil { if err == factories.ErrDataTypeIsNotSupported { return fmt.Errorf( - "receiver %s does not support %s but some pipelines that "+ - "want to process %s are attached to the receiever", + "receiver %s does not support %s but it was used in a "+ + "%s pipeline", config.Name(), dataType.GetString(), dataType.GetString()) @@ -206,8 +207,8 @@ func (rb *ReceiversBuilder) attachReceiverToPipelines( return fmt.Errorf("cannot create receiver %s", config.Name()) } - rb.logger.Info(fmt.Sprintf("Receiver %s is enabled for %s.", - config.Name(), dataType.GetString())) + rb.logger.Info("Receiver is enabled.", + zap.String("receiver", config.Name()), zap.String("datatype", dataType.GetString())) return nil } diff --git a/cmd/occollector/app/builder/receivers_builder_test.go b/cmd/occollector/app/builder/receivers_builder_test.go index 4f3103c5c0a..70ed0be396d 100644 --- a/cmd/occollector/app/builder/receivers_builder_test.go +++ b/cmd/occollector/app/builder/receivers_builder_test.go @@ -195,23 +195,23 @@ func testReceivers( } } -func TestReceiversBuilder_Error(t *testing.T) { - //config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml") - //require.Nil(t, err) - // - //// Corrupt the pipeline, change data type to metrics. We have to forcedly do it here - //// since there is no way to have such config loaded by LoadConfigFile, it would not - //// pass validation. We are doing this to test failure mode of PipelinesBuilder. - //pipeline := config.Pipelines["traces"] - //pipeline.InputType = configmodels.MetricsDataType - // - //exporters, err := NewExportersBuilder(zap.NewNop(), config).Build() - // - //// This should fail because "attributes" processor defined in the config does - //// not support metrics data type. - //_, err = NewPipelinesBuilder(zap.NewNop(), config, exporters).Build() - // - //assert.NotNil(t, err) +func TestReceiversBuilder_DataTypeError(t *testing.T) { + config, err := configv2.LoadConfigFile(t, "testdata/pipelines_builder.yaml") + require.Nil(t, err) + + // Make examplereceiver to "unsupport" trace data type. + receiver := config.Receivers["examplereceiver"] + receiver.(*configv2.ExampleReceiver).FailTraceCreation = true + + // Build the pipeline + allExporters, err := NewExportersBuilder(zap.NewNop(), config).Build() + pipelineProcessors, err := NewPipelinesBuilder(zap.NewNop(), config, allExporters).Build() + receivers, err := NewReceiversBuilder(zap.NewNop(), config, pipelineProcessors).Build() + + // This should fail because "examplereceiver" is attached to "traces" pipeline + // which is a configuration error. + assert.NotNil(t, err) + assert.Nil(t, receivers) } func TestReceiversBuilder_StartAll(t *testing.T) { diff --git a/internal/configmodels/configmodels.go b/internal/configmodels/configmodels.go index 15ca8d47457..67c32d22c8f 100644 --- a/internal/configmodels/configmodels.go +++ b/internal/configmodels/configmodels.go @@ -134,12 +134,12 @@ type ReceiverSettings struct { Endpoint string `mapstructure:"endpoint"` } -// Name gets the exporter name. +// Name gets the receiver name. func (rs *ReceiverSettings) Name() string { return rs.NameVal } -// SetName sets the exporter name. +// SetName sets the receiver name. func (rs *ReceiverSettings) SetName(name string) { rs.NameVal = name } diff --git a/internal/configv2/example_factories.go b/internal/configv2/example_factories.go index 3b94dc9aaa5..2b7712c292a 100644 --- a/internal/configv2/example_factories.go +++ b/internal/configv2/example_factories.go @@ -30,6 +30,12 @@ import ( type ExampleReceiver struct { configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct ExtraSetting string `mapstructure:"extra"` + + // FailTraceCreation causes CreateTraceReceiver to fail. Useful for testing. + FailTraceCreation bool `mapstructure:"-"` + + // FailMetricsCreation causes CreateTraceReceiver to fail. Useful for testing. + FailMetricsCreation bool `mapstructure:"-"` } // ExampleReceiverFactory is factory for ExampleReceiver. @@ -64,7 +70,9 @@ func (f *ExampleReceiverFactory) CreateTraceReceiver( cfg configmodels.Receiver, nextConsumer consumer.TraceConsumer, ) (receiver.TraceReceiver, error) { - // Not used for this test, just return nil + if cfg.(*ExampleReceiver).FailTraceCreation { + return nil, factories.ErrDataTypeIsNotSupported + } return &ExampleReceiverProducer{TraceConsumer: nextConsumer}, nil } @@ -73,7 +81,9 @@ func (f *ExampleReceiverFactory) CreateMetricsReceiver( cfg configmodels.Receiver, nextConsumer consumer.MetricsConsumer, ) (receiver.MetricsReceiver, error) { - // Not used for this test, just return nil + if cfg.(*ExampleReceiver).FailMetricsCreation { + return nil, factories.ErrDataTypeIsNotSupported + } return &ExampleReceiverProducer{MetricsConsumer: nextConsumer}, nil }