Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support adding process tags in OTEL via env variable #2220

Merged
merged 5 commits into from
May 6, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions cmd/agent/app/reporter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
const (
// Whether to use grpc or tchannel reporter.
reporterType = "reporter.type"
// Agent tags
agentTagsDeprecated = "jaeger.tags"
// AgentTagsDeprecated is a configuration property name for adding process tags to incoming spans.
AgentTagsDeprecated = "jaeger.tags"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be named JaegerTagsDeprecated as not specific to agent?
Assuming not specific to agent - the changes seem limited to the agent config at the moment - shouldn't they also be applied to collector?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collector uses only --collector.tags

agentTags = "agent.tags"
// GRPC is name of gRPC reporter.
GRPC Type = "grpc"
Expand All @@ -48,7 +48,7 @@ type Options struct {
func AddFlags(flags *flag.FlagSet) {
flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s", string(GRPC)))
if !setupcontext.IsAllInOne() {
flags.String(agentTagsDeprecated, "", "(deprecated) see --"+agentTags)
flags.String(AgentTagsDeprecated, "", "(deprecated) see --"+agentTags)
flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
}
}
Expand All @@ -57,9 +57,9 @@ func AddFlags(flags *flag.FlagSet) {
func (b *Options) InitFromViper(v *viper.Viper, logger *zap.Logger) *Options {
b.ReporterType = Type(v.GetString(reporterType))
if !setupcontext.IsAllInOne() {
if len(v.GetString(agentTagsDeprecated)) > 0 {
logger.Warn("Using deprecated configuration", zap.String("option", agentTagsDeprecated))
b.AgentTags = flags.ParseJaegerTags(v.GetString(agentTagsDeprecated))
if len(v.GetString(AgentTagsDeprecated)) > 0 {
logger.Warn("Using deprecated configuration", zap.String("option", AgentTagsDeprecated))
b.AgentTags = flags.ParseJaegerTags(v.GetString(AgentTagsDeprecated))
}
if len(v.GetString(agentTags)) > 0 {
b.AgentTags = flags.ParseJaegerTags(v.GetString(agentTags))
Expand Down
32 changes: 25 additions & 7 deletions cmd/opentelemetry-collector/app/defaults/default_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension"
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"
Expand All @@ -36,7 +37,6 @@ const (
httpThriftBinaryEndpoint = "localhost:14268"
udpThriftCompactEndpoint = "localhost:6831"
udpThriftBinaryEndpoint = "localhost:6832"
httpSamplingEndpoint = "localhost:5778"
)

// CollectorConfig creates default collector configuration.
Expand All @@ -56,17 +56,20 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config
recTypes = append(recTypes, string(v.Type()))
}
hc := factories.Extensions["health_check"].CreateDefaultConfig()
resProcessor := factories.Processors["resource"].CreateDefaultConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the same approach going to be used for collector, as with the agent default config, to only use the resource processor if labels defined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, done

return &configmodels.Config{
Receivers: receivers,
Processors: configmodels.Processors{"resource": resProcessor},
Exporters: exporters,
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: recTypes,
Exporters: expTypes,
InputType: configmodels.TracesDataType,
Receivers: recTypes,
Processors: []string{"resource"},
Exporters: expTypes,
},
},
},
Expand Down Expand Up @@ -125,17 +128,24 @@ func createExporters(storageTypes string, factories config.Factories) (configmod
func AgentConfig(factories config.Factories) *configmodels.Config {
jaegerExporter := factories.Exporters["jaeger"]
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config)
processors := configmodels.Processors{}
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
if len(resProcessor.Labels) > 0 {
processors[resProcessor.Name()] = resProcessor
}
return &configmodels.Config{
Receivers: createAgentReceivers(factories),
Processors: processors,
Exporters: configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()},
Extensions: configmodels.Extensions{"health_check": hc},
Service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: map[string]*configmodels.Pipeline{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{"jaeger"},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: processorNames(processors),
Exporters: []string{"jaeger"},
},
},
},
Expand All @@ -161,3 +171,11 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers {
}
return recvs
}

func processorNames(processors configmodels.Processors) []string {
var names []string
for _, v := range processors {
names = append(names, v.Name())
}
return names
}
92 changes: 61 additions & 31 deletions cmd/opentelemetry-collector/app/defaults/default_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector/config"
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
"github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -51,9 +52,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{elasticsearch.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{elasticsearch.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{elasticsearch.TypeStr},
},
},
},
Expand All @@ -63,9 +65,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{cassandra.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{cassandra.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{cassandra.TypeStr},
},
},
},
Expand All @@ -75,9 +78,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{kafka.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{kafka.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{kafka.TypeStr},
},
},
},
Expand All @@ -87,9 +91,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
},
},
},
Expand All @@ -99,9 +104,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
exporterTypes: []string{cassandra.TypeStr},
pipeline: configmodels.Pipelines{
"traces": {
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Exporters: []string{cassandra.TypeStr},
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger", "zipkin"},
Processors: []string{"resource"},
Exporters: []string{cassandra.TypeStr},
},
},
},
Expand All @@ -128,6 +134,7 @@ func TestDefaultCollectorConfig(t *testing.T) {
assert.Equal(t, len(test.pipeline["traces"].Receivers), len(cfg.Receivers))
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name())
assert.Equal(t, len(test.exporterTypes), len(cfg.Exporters))
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])

types := []string{}
for _, v := range cfg.Exporters {
Expand All @@ -141,22 +148,45 @@ func TestDefaultCollectorConfig(t *testing.T) {
}

func TestDefaultAgentConfig(t *testing.T) {
v, _ := jConfig.Viperize(grpc.AddFlags)
factories := Components(v)
cfg := AgentConfig(factories)
assert.Equal(t, configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Exporters: []string{"jaeger"},
tests := []struct {
config map[string]interface{}
service configmodels.Service
}{
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry should have mentioned in preview review - there needs to be a test without the flag being specified to confirm resource processor is not added.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that was my intention when I refactored the test to use arrays, but I apparently forgot to add the second use case.

config: map[string]interface{}{"resource.labels": "foo=bar"},
service: configmodels.Service{
Extensions: []string{"health_check"},
Pipelines: configmodels.Pipelines{
"traces": &configmodels.Pipeline{
InputType: configmodels.TracesDataType,
Receivers: []string{"jaeger"},
Processors: []string{"resource"},
Exporters: []string{"jaeger"},
},
},
},
},
}, cfg.Service)
assert.Equal(t, 0, len(cfg.Processors))
assert.Equal(t, 1, len(cfg.Receivers))
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
assert.Equal(t, 1, len(cfg.Exporters))
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"])
}
for _, test := range tests {
v, _ := jConfig.Viperize(grpc.AddFlags)
for key, val := range test.config {
v.Set(key, val)
}
factories := Components(v)
cfg := AgentConfig(factories)

assert.Equal(t, test.service, cfg.Service)
assert.Equal(t, 1, len(cfg.Receivers))
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
assert.Equal(t, 1, len(cfg.Exporters))
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"])
processorMap := map[string]bool{}
for _, p := range test.service.Pipelines["traces"].Processors {
processorMap[p] = true
}
if processorMap["resource"] {
assert.Equal(t, 1, len(cfg.Processors))
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
}
}
}
16 changes: 12 additions & 4 deletions cmd/opentelemetry-collector/app/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ import (
"flag"

"github.com/open-telemetry/opentelemetry-collector/config"
otelJaegerEexporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
otelJaegerreceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
otelJaegerExporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
otelResourceProcessor "github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
otelJaegerReceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
"github.com/spf13/pflag"
"github.com/spf13/viper"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
Expand Down Expand Up @@ -61,16 +63,22 @@ func Components(v *viper.Viper) config.Factories {
factories.Exporters[cassandraExp.Type()] = cassandraExp
factories.Exporters[esExp.Type()] = esExp

jaegerRec := factories.Receivers["jaeger"].(*otelJaegerreceiver.Factory)
jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory)
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
Wrapped: jaegerRec,
Viper: v,
}
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerEexporter.Factory)
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerExporter.Factory)
factories.Exporters["jaeger"] = &jaegerexporter.Factory{
Wrapped: jaegerExp,
Viper: v,
}

resourceProc := factories.Processors["resource"].(*otelResourceProcessor.Factory)
factories.Processors["resource"] = &resourceprocessor.Factory{
Wrapped: resourceProc,
Viper: v,
}
return factories
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/opentelemetry-collector/app/defaults/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
)
Expand Down Expand Up @@ -51,4 +52,5 @@ func TestComponents(t *testing.T) {
assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers)
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
assert.IsType(t, &resourceprocessor.Factory{}, factories.Processors["resource"])
}
Loading