Skip to content

Commit

Permalink
Move tail sampling config to its own package and remove stale configs (
Browse files Browse the repository at this point in the history
…#217)

Follow up of #200. Second step of #146.
  • Loading branch information
songy23 authored and Paulo Janotti committed Aug 1, 2019
1 parent dc4266d commit d0b9b47
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 258 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package builder
package tailsampling

import (
"time"

"github.com/spf13/viper"

"github.com/open-telemetry/opentelemetry-service/config/configmodels"
)

const (
modeTag = "mode"
policiesTag = "policies"
samplingTag = "sampling"
)

// Mode indicates the sampling mode
type Mode string

const (
// NoSampling mode is the default and means that all data arriving at the collector
// is passed ahead.
NoSampling Mode = "no-sampling"
// HeadSampling is the mode in which trace data is sampled at ingestion, without seeing
// the whole trace data.
HeadSampling Mode = "head"
// TailSampling is the mode in which trace data is temporarily retained until an evaluation
// if the trace should be sampled is performed.
TailSampling Mode = "tail"
)

// PolicyType indicates the type of sampling policy.
type PolicyType string

Expand Down Expand Up @@ -92,75 +69,8 @@ type StringAttributeFilterCfg struct {
Values []string `mapstructure:"values"`
}

// RateLimitingCfg holds the configurable settings to create a string attribute filter
// sampling policy evaluator.
type RateLimitingCfg struct {
// SpansPerSecond limit to the number of spans per second
SpansPerSecond int64 `mapstructure:"spans-per-second"`
}

// SamplingCfg holds the sampling configuration.
type SamplingCfg struct {
// Mode specifies the sampling mode to be used.
Mode Mode `mapstructure:"mode"`
// Policies contains the list of policies to be used by sampling.
Policies []*PolicyCfg `mapstructure:"policies"`
}

// NewDefaultSamplingCfg creates a SamplingCfg with the default values.
func NewDefaultSamplingCfg() *SamplingCfg {
return &SamplingCfg{
Mode: NoSampling,
}
}

// InitFromViper initializes SamplingCfg with properties from viper.
func (sCfg *SamplingCfg) InitFromViper(v *viper.Viper) *SamplingCfg {
sv := v.Sub(samplingTag)
if sv == nil {
return sCfg
}

sCfg.Mode = Mode(sv.GetString(modeTag))

pv := sv.Sub(policiesTag)
if pv == nil {
return sCfg
}

for policyName := range sv.GetStringMap(policiesTag) {
polSub := pv.Sub(policyName)
polCfg := &PolicyCfg{}
polCfg.Name = policyName
polCfg.Type = PolicyType(polSub.GetString("policy"))
polCfg.Exporters = polSub.GetStringSlice("exporters")

cfgSub := polSub.Sub("configuration")
if cfgSub != nil {
// As the number of polices grow this likely should be in a map.
var cfg interface{}
switch polCfg.Type {
case NumericAttributeFilter:
numAttributeFilterCfg := &NumericAttributeFilterCfg{}
cfg = numAttributeFilterCfg
case StringAttributeFilter:
strAttributeFilterCfg := &StringAttributeFilterCfg{}
cfg = strAttributeFilterCfg
case RateLimiting:
rateLimitingCfg := &RateLimitingCfg{}
cfg = rateLimitingCfg
}
cfgSub.Unmarshal(cfg)
polCfg.Configuration = cfg
}

sCfg.Policies = append(sCfg.Policies, polCfg)
}
return sCfg
}

// TailBasedCfg holds the configuration for tail-based sampling.
type TailBasedCfg struct {
// Config holds the configuration for tail-based sampling.
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`
// DecisionWait is the desired wait time from the arrival of the first span of
// trace until the decision about sampling it or not is evaluated.
Expand All @@ -172,17 +82,3 @@ type TailBasedCfg struct {
// per second. This helps with allocating data structures with closer to actual usage size.
ExpectedNewTracesPerSec uint64 `mapstructure:"expected-new-traces-per-sec"`
}

// InitFromViper initializes TailBasedCfg with properties from viper.
func (tCfg *TailBasedCfg) InitFromViper(v *viper.Viper) *TailBasedCfg {
tv := v.Sub(samplingTag)
if tv == nil {
return tCfg
}
if tv == nil || tv.GetString(modeTag) != string(TailSampling) {
return tCfg
}

tv.Unmarshal(tCfg)
return tCfg
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package builder_test
package tailsampling

import (
"path"
Expand All @@ -24,15 +24,13 @@ import (

"github.com/open-telemetry/opentelemetry-service/config"
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/processor/tailsampling"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

func TestLoadConfig(t *testing.T) {
receivers, processors, exporters, err := config.ExampleComponents()
assert.Nil(t, err)

factory := &tailsampling.Factory{}
factory := &Factory{}
processors[factory.Type()] = factory

cfg, err := config.LoadConfigFile(
Expand All @@ -44,7 +42,7 @@ func TestLoadConfig(t *testing.T) {

p0 := cfg.Processors["tail-sampling"]
assert.Equal(t, p0,
&builder.TailBasedCfg{
&Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: "tail-sampling",
NameVal: "tail-sampling",
Expand Down
5 changes: 2 additions & 3 deletions processor/tailsampling/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/open-telemetry/opentelemetry-service/config/configmodels"
"github.com/open-telemetry/opentelemetry-service/consumer"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

const (
Expand All @@ -42,7 +41,7 @@ func (f *Factory) Type() string {

// CreateDefaultConfig creates the default configuration for processor.
func (f *Factory) CreateDefaultConfig() configmodels.Processor {
return &builder.TailBasedCfg{
return &Config{
DecisionWait: 30 * time.Second,
NumTraces: 50000,
}
Expand All @@ -54,7 +53,7 @@ func (f *Factory) CreateTraceProcessor(
nextConsumer consumer.TraceConsumer,
cfg configmodels.Processor,
) (processor.TraceProcessor, error) {
tCfg := cfg.(*builder.TailBasedCfg)
tCfg := cfg.(*Config)
return NewTraceProcessor(logger, nextConsumer, *tCfg)
}

Expand Down
3 changes: 1 addition & 2 deletions processor/tailsampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/processor"
"github.com/open-telemetry/opentelemetry-service/service/builder"
)

// Policy combines a sampling policy evaluator with the destinations to be
Expand Down Expand Up @@ -76,7 +75,7 @@ var _ processor.TraceProcessor = (*tailSamplingSpanProcessor)(nil)

// NewTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given
// configuration.
func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg builder.TailBasedCfg) (processor.TraceProcessor, error) {
func NewTraceProcessor(logger *zap.Logger, nextConsumer consumer.TraceConsumer, cfg Config) (processor.TraceProcessor, error) {
if nextConsumer == nil {
return nil, errors.New("nextConsumer is nil")
}
Expand Down
11 changes: 5 additions & 6 deletions processor/tailsampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/open-telemetry/opentelemetry-service/exporter/exportertest"
"github.com/open-telemetry/opentelemetry-service/internal/collector/processor/idbatcher"
"github.com/open-telemetry/opentelemetry-service/internal/collector/sampling"
"github.com/open-telemetry/opentelemetry-service/service/builder"
tracetranslator "github.com/open-telemetry/opentelemetry-service/translator/trace"
)

Expand All @@ -38,7 +37,7 @@ const (

func TestSequentialTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)
cfg := builder.TailBasedCfg{
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
Expand All @@ -64,7 +63,7 @@ func TestConcurrentTraceArrival(t *testing.T) {
traceIds, batches := generateIdsAndBatches(128)

var wg sync.WaitGroup
cfg := builder.TailBasedCfg{
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
Expand Down Expand Up @@ -102,7 +101,7 @@ func TestConcurrentTraceArrival(t *testing.T) {
func TestSequentialTraceMapSize(t *testing.T) {
traceIds, batches := generateIdsAndBatches(210)
const maxSize = 100
cfg := builder.TailBasedCfg{
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
Expand All @@ -125,7 +124,7 @@ func TestConcurrentTraceMapSize(t *testing.T) {
_, batches := generateIdsAndBatches(210)
const maxSize = 100
var wg sync.WaitGroup
cfg := builder.TailBasedCfg{
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
Expand Down Expand Up @@ -160,7 +159,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
const decisionWaitSeconds = 5
msp := &mockSpanProcessor{}
mpe := &mockPolicyEvaluator{}
cfg := builder.TailBasedCfg{
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(maxSize),
ExpectedNewTracesPerSec: 64,
Expand Down
68 changes: 0 additions & 68 deletions service/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package builder

import (
"encoding/json"
"reflect"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -72,72 +70,6 @@ func TestMultiAndQueuedSpanProcessorConfig(t *testing.T) {
}
}

func TestTailSamplingPoliciesConfiguration(t *testing.T) {
v, err := loadViperFromFile("./testdata/sampling_config.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

wCfg := NewDefaultSamplingCfg()
if wCfg.Mode != NoSampling {
t.Fatalf("Default SamplingCfg Mode should be NoSampling")
}
wCfg.Mode = TailSampling
wCfg.Policies = []*PolicyCfg{
{
Name: "string-attribute-filter1",
Type: StringAttributeFilter,
Exporters: []string{"jaeger1"},
Configuration: &StringAttributeFilterCfg{
Key: "test",
Values: []string{"value 1", "value 2"},
},
},
{
Name: "numeric-attribute-filter2",
Type: NumericAttributeFilter,
Exporters: []string{"jaeger2"},
Configuration: &NumericAttributeFilterCfg{
Key: "http.status_code",
MinValue: 400,
MaxValue: 999,
},
},
{
Name: "string-attribute-filter3",
Type: StringAttributeFilter,
Exporters: []string{"jaeger3"},
Configuration: &StringAttributeFilterCfg{
Key: "test.different",
Values: []string{"key 1", "key 2"},
},
},
{
Name: "numeric-attribute-filter4",
Type: NumericAttributeFilter,
Exporters: []string{"jaeger4", "jaeger5"},
Configuration: &NumericAttributeFilterCfg{
Key: "http.status_code",
MinValue: 400,
MaxValue: 999,
},
},
}

gCfg := NewDefaultSamplingCfg().InitFromViper(v)
sort.Slice(gCfg.Policies, func(i, j int) bool {
if len(gCfg.Policies[i].Exporters) == len(gCfg.Policies[j].Exporters) {
return gCfg.Policies[i].Exporters[0] < gCfg.Policies[j].Exporters[0]
}
return len(gCfg.Policies[i].Exporters) < len(gCfg.Policies[j].Exporters)
})

if !reflect.DeepEqual(gCfg, wCfg) {
gb, _ := json.MarshalIndent(gCfg, "", " ")
t.Fatalf("Wanted %+v but got %+v\ngot json:\n%s", *wCfg, *gCfg, string(gb))
}
}

func loadViperFromFile(file string) (*viper.Viper, error) {
v := viper.New()
v.SetConfigFile(file)
Expand Down
Loading

0 comments on commit d0b9b47

Please sign in to comment.