Skip to content

Commit

Permalink
[BREAKING CHANGE] Migrate Resource Processor to internal data model
Browse files Browse the repository at this point in the history
This commit migrates resource processor to internal data model. Since existing configuration relevant only to OpenCensus resource, this commits breaks existing configuration in favor of OTLP resource structure. It requires the following changes to migrate existing resource processor config:
1. Rename "labels" field to "attributes"
2. Add value from "type" field to "attributes" under "opencensus.resourcetype" key.
  • Loading branch information
dmitryax committed Jul 10, 2020
1 parent 474407c commit bc59aed
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 354 deletions.
6 changes: 2 additions & 4 deletions processor/resourceprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ The resource processor can be used to override a resource.
Please refer to [config.go](./config.go) for the config spec.

The following configuration options are required:
- `type`: Resource type to be applied. If specified, this value overrides the
original resource type. Otherwise, the original resource type is kept.
- `labels`: Map of key/value pairs that should be added to the resource.
- `attributes`: Map of key/value pairs that should be added to the resource.

Examples:

```yaml
processors:
resource:
type: "host"
labels: {
attributes: {
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
Expand Down
8 changes: 3 additions & 5 deletions processor/resourceprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
// Config defines configuration for Resource processor.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// ResourceType overrides the original resource type.
ResourceType string `mapstructure:"type"`
// Labels specify static labels to be added to resource.
// In case of a conflict the label will be overridden.
Labels map[string]string `mapstructure:"labels"`
// Attributes specify static key/values pairs to be added to resource attributes.
// In case of a conflict the attribute will be overridden.
Attributes map[string]string `mapstructure:"attributes"`
}
3 changes: 1 addition & 2 deletions processor/resourceprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ func TestLoadConfig(t *testing.T) {
TypeVal: "resource",
NameVal: "resource/2",
},
ResourceType: "host",
Labels: map[string]string{
Attributes: map[string]string{
"cloud.zone": "zone-1",
"k8s.cluster.name": "k8s-cluster",
"host.name": "k8s-node",
Expand Down
23 changes: 16 additions & 7 deletions processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package resourceprocessor

import (
"go.uber.org/zap"
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand All @@ -32,30 +32,39 @@ type Factory struct {
}

// Type gets the type of the Option config created by this factory.
func (Factory) Type() configmodels.Type {
func (*Factory) Type() configmodels.Type {
return typeStr
}

// CreateDefaultConfig creates the default configuration for processor.
func (Factory) CreateDefaultConfig() configmodels.Processor {
func (*Factory) CreateDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
ResourceType: "",
Labels: map[string]string{},
Attributes: map[string]string{},
}
}

// CreateTraceProcessor creates a trace processor based on this config.
func (Factory) CreateTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumerOld, cfg configmodels.Processor) (component.TraceProcessorOld, error) {
func (*Factory) CreateTraceProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (component.TraceProcessor, error) {
oCfg := cfg.(*Config)
return newResourceTraceProcessor(nextConsumer, oCfg), nil
}

// CreateMetricsProcessor creates a metrics processor based on this config.
func (Factory) CreateMetricsProcessor(logger *zap.Logger, nextConsumer consumer.MetricsConsumerOld, cfg configmodels.Processor) (component.MetricsProcessorOld, error) {
func (*Factory) CreateMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
nextConsumer consumer.MetricsConsumer,
cfg configmodels.Processor,
) (component.MetricsProcessor, error) {
oCfg := cfg.(*Config)
return newResourceMetricProcessor(nextConsumer, oCfg), nil
}
7 changes: 4 additions & 3 deletions processor/resourceprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package resourceprocessor

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
)

Expand All @@ -34,11 +35,11 @@ func TestCreateProcessor(t *testing.T) {
var factory Factory
cfg := factory.CreateDefaultConfig()

tp, err := factory.CreateTraceProcessor(zap.NewNop(), nil, cfg)
tp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, tp)

mp, err := factory.CreateMetricsProcessor(zap.NewNop(), nil, cfg)
mp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, nil, cfg)
assert.NoError(t, err)
assert.NotNil(t, mp)
}
105 changes: 41 additions & 64 deletions processor/resourceprocessor/resource_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,41 @@ package resourceprocessor
import (
"context"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/consumer/pdatautil"
)

type resourceTraceProcessor struct {
resource *resourcepb.Resource
attributes map[string]string
capabilities component.ProcessorCapabilities
next consumer.TraceConsumerOld
next consumer.TraceConsumer
}

func newResourceTraceProcessor(next consumer.TraceConsumerOld, cfg *Config) *resourceTraceProcessor {
resource := createResource(cfg)
func newResourceTraceProcessor(next consumer.TraceConsumer, cfg *Config) *resourceTraceProcessor {
return &resourceTraceProcessor{
attributes: cfg.Attributes,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: len(cfg.Attributes) > 0},
next: next,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)},
resource: resource,
}
}

// ConsumeTraceData implements the TraceProcessor interface
func (rtp *resourceTraceProcessor) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
return rtp.next.ConsumeTraceData(ctx, consumerdata.TraceData{
Node: td.Node,
Resource: mergeResource(td.Resource, rtp.resource),
Spans: td.Spans,
SourceFormat: td.SourceFormat,
})
func (rtp *resourceTraceProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
if len(rtp.attributes) == 0 {
return rtp.next.ConsumeTraces(ctx, td)
}

rss := td.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
resource := rss.At(i).Resource()
if resource.IsNil() {
resource.InitEmpty()
}
applyAttributes(resource, rtp.attributes)
}
return rtp.next.ConsumeTraces(ctx, td)
}

// GetCapabilities returns the ProcessorCapabilities assocciated with the resource processor.
Expand All @@ -65,16 +70,15 @@ func (*resourceTraceProcessor) Shutdown(context.Context) error {
}

type resourceMetricProcessor struct {
resource *resourcepb.Resource
attributes map[string]string
capabilities component.ProcessorCapabilities
next consumer.MetricsConsumerOld
next consumer.MetricsConsumer
}

func newResourceMetricProcessor(next consumer.MetricsConsumerOld, cfg *Config) *resourceMetricProcessor {
resource := createResource(cfg)
func newResourceMetricProcessor(next consumer.MetricsConsumer, cfg *Config) *resourceMetricProcessor {
return &resourceMetricProcessor{
resource: resource,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: !isEmptyResource(resource)},
attributes: cfg.Attributes,
capabilities: component.ProcessorCapabilities{MutatesConsumedData: len(cfg.Attributes) > 0},
next: next,
}
}
Expand All @@ -95,52 +99,25 @@ func (*resourceMetricProcessor) Shutdown(context.Context) error {
}

// ConsumeMetricsData implements the MetricsProcessor interface
func (rmp *resourceMetricProcessor) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
return rmp.next.ConsumeMetricsData(ctx, consumerdata.MetricsData{
Node: md.Node,
Resource: mergeResource(md.Resource, rmp.resource),
Metrics: md.Metrics,
})
}

func createResource(cfg *Config) *resourcepb.Resource {
rpb := &resourcepb.Resource{
Type: cfg.ResourceType,
Labels: map[string]string{},
}
for k, v := range cfg.Labels {
rpb.Labels[k] = v
func (rmp *resourceMetricProcessor) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
if len(rmp.attributes) == 0 {
return rmp.next.ConsumeMetrics(ctx, md)
}
return rpb
}

func mergeResource(to, from *resourcepb.Resource) *resourcepb.Resource {
if isEmptyResource(from) {
return to
}
if to == nil {
if from.Type == "" {
// Since resource without type would be invalid, we keep resource as nil
return nil
}
to = &resourcepb.Resource{Labels: map[string]string{}}
}
if from.Type != "" {
// Only change resource type if it was configured
to.Type = from.Type
}
if from.Labels != nil {
if to.Labels == nil {
to.Labels = make(map[string]string, len(from.Labels))
}

for k, v := range from.Labels {
to.Labels[k] = v
imd := pdatautil.MetricsToInternalMetrics(md)
rms := imd.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
resource := rms.At(i).Resource()
if resource.IsNil() {
resource.InitEmpty()
}
applyAttributes(resource, rmp.attributes)
}
return to
return rmp.next.ConsumeMetrics(ctx, md)
}

func isEmptyResource(resource *resourcepb.Resource) bool {
return resource.Type == "" && len(resource.Labels) == 0
func applyAttributes(resource pdata.Resource, attrs map[string]string) {
for k, v := range attrs {
resource.Attributes().UpsertString(k, v)
}
}
Loading

0 comments on commit bc59aed

Please sign in to comment.