Skip to content

Commit

Permalink
Add debug logging to tail sampling filters (#463)
Browse files Browse the repository at this point in the history
* Add debug logging to tail sampling filters

Signed-off-by: Annanay <[email protected]>

* Correct import order

Signed-off-by: Annanay <[email protected]>
  • Loading branch information
annanay25 authored and Paulo Janotti committed Jan 15, 2020
1 parent b04b16d commit 454fa8e
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 15 deletions.
12 changes: 6 additions & 6 deletions processor/samplingprocessor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer,
if err != nil {
return nil, err
}
eval, err := getPolicyEvaluator(policyCfg)
eval, err := getPolicyEvaluator(logger, policyCfg)
if err != nil {
return nil, err
}
Expand All @@ -122,19 +122,19 @@ func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer,
return tsp, nil
}

func getPolicyEvaluator(cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case AlwaysSample:
return sampling.NewAlwaysSample(), nil
return sampling.NewAlwaysSample(logger), nil
case NumericAttribute:
nafCfg := cfg.NumericAttributeCfg
return sampling.NewNumericAttributeFilter(nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil
case StringAttribute:
safCfg := cfg.StringAttributeCfg
return sampling.NewStringAttributeFilter(safCfg.Key, safCfg.Values), nil
return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values), nil
case RateLimiting:
rlfCfg := cfg.RateLimitingCfg
return sampling.NewRateLimiting(rlfCfg.SpansPerSecond), nil
return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil
default:
return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,42 @@

package sampling

import tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
import (
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"
)

type alwaysSample struct{}
type alwaysSample struct {
logger *zap.Logger
}

var _ PolicyEvaluator = (*alwaysSample)(nil)

// NewAlwaysSample creates a policy evaluator the samples all traces.
func NewAlwaysSample() PolicyEvaluator {
return &alwaysSample{}
func NewAlwaysSample(logger *zap.Logger) PolicyEvaluator {
return &alwaysSample{
logger: logger,
}
}

// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived
// after the sampling decision was already taken for the trace.
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (as *alwaysSample) OnLateArrivingSpans(earlyDecision Decision, spans []*tracepb.Span) error {
as.logger.Debug("Triggering action for late arriving spans in always-sample filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (as *alwaysSample) Evaluate(traceID []byte, trace *TraceData) (Decision, error) {
as.logger.Debug("Evaluating spans in always-sample filter")
return Sampled, nil
}

// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (as *alwaysSample) OnDroppedSpans(traceID []byte, trace *TraceData) (Decision, error) {
as.logger.Debug("Triggering action for dropped spans in always-sample filter")
return Sampled, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,27 @@

package sampling

import tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
import (
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"
)

type numericAttributeFilter struct {
key string
minValue, maxValue int64
logger *zap.Logger
}

var _ PolicyEvaluator = (*numericAttributeFilter)(nil)

// NewNumericAttributeFilter creates a policy evaluator that samples all traces with
// the given attribute in the given numeric range.
func NewNumericAttributeFilter(key string, minValue, maxValue int64) PolicyEvaluator {
func NewNumericAttributeFilter(logger *zap.Logger, key string, minValue, maxValue int64) PolicyEvaluator {
return &numericAttributeFilter{
key: key,
minValue: minValue,
maxValue: maxValue,
logger: logger,
}
}

Expand All @@ -38,11 +43,13 @@ func NewNumericAttributeFilter(key string, minValue, maxValue int64) PolicyEvalu
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (naf *numericAttributeFilter) OnLateArrivingSpans(earlyDecision Decision, spans []*tracepb.Span) error {
naf.logger.Debug("Triggering action for late arriving spans in numeric-attribute filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (naf *numericAttributeFilter) Evaluate(traceID []byte, trace *TraceData) (Decision, error) {
naf.logger.Debug("Evaluating spans in numeric-attribute filter")
trace.Lock()
batches := trace.ReceivedBatches
trace.Unlock()
Expand All @@ -66,5 +73,6 @@ func (naf *numericAttributeFilter) Evaluate(traceID []byte, trace *TraceData) (D
// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (naf *numericAttributeFilter) OnDroppedSpans(traceID []byte, trace *TraceData) (Decision, error) {
naf.logger.Debug("Triggering action for dropped spans in numeric-attribute filter")
return NotSampled, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@ import (
"time"

tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"
)

type rateLimiting struct {
currentSecond int64
spansInCurrentSecond int64
spansPerSecond int64
logger *zap.Logger
}

var _ PolicyEvaluator = (*rateLimiting)(nil)

// NewRateLimiting creates a policy evaluator the samples all traces.
func NewRateLimiting(spansPerSecond int64) PolicyEvaluator {
func NewRateLimiting(logger *zap.Logger, spansPerSecond int64) PolicyEvaluator {
return &rateLimiting{
spansPerSecond: spansPerSecond,
logger: logger,
}
}

Expand All @@ -40,11 +43,13 @@ func NewRateLimiting(spansPerSecond int64) PolicyEvaluator {
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (r *rateLimiting) OnLateArrivingSpans(earlyDecision Decision, spans []*tracepb.Span) error {
r.logger.Debug("Triggering action for late arriving spans in rate-limiting filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (r *rateLimiting) Evaluate(traceID []byte, trace *TraceData) (Decision, error) {
r.logger.Debug("Evaluating spans in rate-limiting filter")
currSecond := time.Now().Unix()
if r.currentSecond != currSecond {
r.currentSecond = currSecond
Expand All @@ -63,5 +68,6 @@ func (r *rateLimiting) Evaluate(traceID []byte, trace *TraceData) (Decision, err
// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (r *rateLimiting) OnDroppedSpans(traceID []byte, trace *TraceData) (Decision, error) {
r.logger.Debug("Triggering action for dropped spans in rate-limiting filter")
return Sampled, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@

package sampling

import tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
import (
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"go.uber.org/zap"
)

type stringAttributeFilter struct {
key string
values map[string]struct{}
logger *zap.Logger
}

var _ PolicyEvaluator = (*stringAttributeFilter)(nil)

// NewStringAttributeFilter creates a policy evaluator that samples all traces with
// the given attribute in the given numeric range.
func NewStringAttributeFilter(key string, values []string) PolicyEvaluator {
func NewStringAttributeFilter(logger *zap.Logger, key string, values []string) PolicyEvaluator {
valuesMap := make(map[string]struct{})
for _, value := range values {
if value != "" {
Expand All @@ -35,6 +39,7 @@ func NewStringAttributeFilter(key string, values []string) PolicyEvaluator {
return &stringAttributeFilter{
key: key,
values: valuesMap,
logger: logger,
}
}

Expand All @@ -43,11 +48,13 @@ func NewStringAttributeFilter(key string, values []string) PolicyEvaluator {
// This gives the evaluator a chance to log any message/metrics and/or update any
// related internal state.
func (saf *stringAttributeFilter) OnLateArrivingSpans(earlyDecision Decision, spans []*tracepb.Span) error {
saf.logger.Debug("Triggering action for late arriving spans in string-tag filter")
return nil
}

// Evaluate looks at the trace data and returns a corresponding SamplingDecision.
func (saf *stringAttributeFilter) Evaluate(traceID []byte, trace *TraceData) (Decision, error) {
saf.logger.Debug("Evaluting spans in string-tag filter")
trace.Lock()
batches := trace.ReceivedBatches
trace.Unlock()
Expand Down Expand Up @@ -81,5 +88,6 @@ func (saf *stringAttributeFilter) Evaluate(traceID []byte, trace *TraceData) (De
// OnDroppedSpans is called when the trace needs to be dropped, due to memory
// pressure, before the decision_wait time has been reached.
func (saf *stringAttributeFilter) OnDroppedSpans(traceID []byte, trace *TraceData) (Decision, error) {
saf.logger.Debug("Triggering action for dropped spans in string-tag filter")
return NotSampled, nil
}

0 comments on commit 454fa8e

Please sign in to comment.