Skip to content

Commit

Permalink
Expose telemetry level in the configtelemetry (#1796)
Browse files Browse the repository at this point in the history
Next PR will add a config setting that can be embedded in every component config.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Sep 18, 2020
1 parent 6d1c189 commit e7e6693
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 71 deletions.
30 changes: 30 additions & 0 deletions config/configtelemetry/configtelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configtelemetry

const (
// LevelNone indicates that no telemetry data should be collected.
LevelNone Level = iota - 1
// LevelBasic is the default and covers the basics of the service telemetry.
LevelBasic
// LevelNormal adds some other indicators on top of basic.
LevelNormal
// LevelDetailed adds dimensions and views to the previous levels.
LevelDetailed
)

// Level is the level of internal telemetry (metrics, logs, traces about the component itself)
// that every component should generate.
type Level int8
17 changes: 17 additions & 0 deletions config/configtelemetry/empty_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package configtelemetry

// Package with interfaces and structs only.
27 changes: 7 additions & 20 deletions internal/collector/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,14 @@ import (
"fmt"
"strings"

"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/internal/version"
)

const (
metricsAddrCfg = "metrics-addr"
metricsLevelCfg = "metrics-level"
metricsPrefixCfg = "metrics-prefix"

// Telemetry levels
//
// None indicates that no telemetry data should be collected.
None Level = iota - 1
// Basic is the default and covers the basics of the service telemetry.
Basic
// Normal adds some other indicators on top of basic.
Normal
// Detailed adds dimensions and views to the previous levels.
Detailed
)

var (
Expand Down Expand Up @@ -97,17 +87,14 @@ func GetMetricsAddrDefault() string {
return ":8888"
}

// Level of telemetry data to be generated.
type Level int8

func GetAddInstanceID() bool {
return *addInstanceIDPtr
}

// GetLevel returns the Level represented by the string. The parsing is case-insensitive
// and it returns error if the string value is unknown.
func GetLevel() (Level, error) {
var level Level
func GetLevel() (configtelemetry.Level, error) {
var level configtelemetry.Level
var str string

if metricsLevelPtr != nil {
Expand All @@ -116,13 +103,13 @@ func GetLevel() (Level, error) {

switch str {
case "none":
level = None
level = configtelemetry.LevelNone
case "", "basic":
level = Basic
level = configtelemetry.LevelBasic
case "normal":
level = Normal
level = configtelemetry.LevelNormal
case "detailed":
level = Detailed
level = configtelemetry.LevelDetailed
default:
return level, fmt.Errorf("unknown metrics level %q", str)
}
Expand Down
14 changes: 7 additions & 7 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/processor"
)

Expand All @@ -41,7 +41,7 @@ import (
type batchProcessor struct {
name string
logger *zap.Logger
telemetryLevel telemetry.Level
telemetryLevel configtelemetry.Level

sendBatchSize uint32
timeout time.Duration
Expand Down Expand Up @@ -74,7 +74,7 @@ var _ consumer.TraceConsumer = (*batchProcessor)(nil)
var _ consumer.MetricsConsumer = (*batchProcessor)(nil)
var _ consumer.LogsConsumer = (*batchProcessor)(nil)

func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel telemetry.Level) *batchProcessor {
func newBatchProcessor(params component.ProcessorCreateParams, cfg *Config, batch batch, telemetryLevel configtelemetry.Level) *batchProcessor {
return &batchProcessor{
name: cfg.Name(),
logger: params.Logger,
Expand Down Expand Up @@ -158,7 +158,7 @@ func (bp *batchProcessor) sendItems(measure *stats.Int64Measure) {
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1), statBatchSendSize.M(int64(bp.batch.itemCount())))

if bp.telemetryLevel == telemetry.Detailed {
if bp.telemetryLevel == configtelemetry.LevelDetailed {
_ = stats.RecordWithTags(context.Background(), statsTags, statBatchSendSizeBytes.M(int64(bp.batch.size())))
}

Expand Down Expand Up @@ -188,17 +188,17 @@ func (bp *batchProcessor) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
}

// newBatchTracesProcessor creates a new batch processor that batches traces by size or with timeout
func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.TraceConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
func newBatchTracesProcessor(params component.ProcessorCreateParams, trace consumer.TraceConsumer, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchTraces(trace), telemetryLevel)
}

// newBatchMetricsProcessor creates a new batch processor that batches metrics by size or with timeout
func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics consumer.MetricsConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
func newBatchMetricsProcessor(params component.ProcessorCreateParams, metrics consumer.MetricsConsumer, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchMetrics(metrics), telemetryLevel)
}

// newBatchLogsProcessor creates a new batch processor that batches logs by size or with timeout
func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.LogsConsumer, cfg *Config, telemetryLevel telemetry.Level) *batchProcessor {
func newBatchLogsProcessor(params component.ProcessorCreateParams, logs consumer.LogsConsumer, cfg *Config, telemetryLevel configtelemetry.Level) *batchProcessor {
return newBatchProcessor(params, cfg, newBatchLogs(logs), telemetryLevel)
}

Expand Down
34 changes: 17 additions & 17 deletions processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/internal/data/testdata"
)

Expand All @@ -38,7 +38,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, telemetry.Detailed)
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 1000
Expand Down Expand Up @@ -79,7 +79,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
cfg.SendBatchSize = 128
cfg.SendBatchMaxSize = 128
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, telemetry.Basic)
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelBasic)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 1000
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) {
}

func TestBatchProcessorSentBySize(t *testing.T) {
views := MetricViews(telemetry.Detailed)
views := MetricViews(configtelemetry.LevelDetailed)
view.Register(views...)
defer view.Unregister(views...)

Expand All @@ -126,7 +126,7 @@ func TestBatchProcessorSentBySize(t *testing.T) {
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.Timeout = 500 * time.Millisecond
creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, cfg, telemetry.Detailed)
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 100
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {
spansPerRequest := 10
start := time.Now()

batcher := newBatchTracesProcessor(creationParams, sink, cfg, telemetry.Detailed)
batcher := newBatchTracesProcessor(creationParams, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

for requestNum := 0; requestNum < requestCount; requestNum++ {
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
sink := &exportertest.SinkTraceExporter{}

creationParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchTracesProcessor(creationParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchTracesProcessor(creationParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 10
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
sink := &exportertest.SinkMetricsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

metricDataSlice := make([]pdata.Metrics, 0, requestCount)
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
}

func TestBatchMetricProcessor_BatchSize(t *testing.T) {
views := MetricViews(telemetry.Detailed)
views := MetricViews(configtelemetry.LevelDetailed)
view.Register(views...)
defer view.Unregister(views...)

Expand All @@ -313,7 +313,7 @@ func TestBatchMetricProcessor_BatchSize(t *testing.T) {
sink := &exportertest.SinkMetricsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {
sink := &exportertest.SinkMetricsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
Expand Down Expand Up @@ -415,7 +415,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) {
sink := &exportertest.SinkMetricsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchMetricsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

for requestNum := 0; requestNum < requestCount; requestNum++ {
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

logDataSlice := make([]pdata.Logs, 0, requestCount)
Expand Down Expand Up @@ -553,7 +553,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
}

func TestBatchLogProcessor_BatchSize(t *testing.T) {
views := MetricViews(telemetry.Detailed)
views := MetricViews(configtelemetry.LevelDetailed)
view.Register(views...)
defer view.Unregister(views...)

Expand All @@ -569,7 +569,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) {
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
Expand Down Expand Up @@ -624,7 +624,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

start := time.Now()
Expand Down Expand Up @@ -671,7 +671,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) {
sink := &exportertest.SinkLogsExporter{}

createParams := component.ProcessorCreateParams{Logger: zap.NewNop()}
batcher := newBatchLogsProcessor(createParams, sink, &cfg, telemetry.Detailed)
batcher := newBatchLogsProcessor(createParams, sink, &cfg, configtelemetry.LevelDetailed)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

for requestNum := 0; requestNum < requestCount; requestNum++ {
Expand Down
6 changes: 3 additions & 3 deletions processor/batchprocessor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"

"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/obsreport"
"go.opentelemetry.io/collector/processor"
)
Expand All @@ -32,8 +32,8 @@ var (
)

// MetricViews returns the metrics views related to batching
func MetricViews(level telemetry.Level) []*view.View {
if level == telemetry.None {
func MetricViews(level configtelemetry.Level) []*view.View {
if level == configtelemetry.LevelNone {
return nil
}

Expand Down
8 changes: 4 additions & 4 deletions processor/batchprocessor/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import (

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/config/configtelemetry"
)

func TestBatchProcessorMetrics(t *testing.T) {
tests := []struct {
viewNames []string
level telemetry.Level
level configtelemetry.Level
}{
{
viewNames: []string{"batch_size_trigger_send", "timeout_trigger_send", "batch_send_size", "batch_send_size_bytes"},
level: telemetry.Detailed,
level: configtelemetry.LevelDetailed,
},
{
viewNames: []string{},
level: telemetry.None,
level: configtelemetry.LevelNone,
},
}
for _, test := range tests {
Expand Down
Loading

0 comments on commit e7e6693

Please sign in to comment.