From 6ae66159741d183793956c487a8c7aaa44313036 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Tue, 27 Oct 2020 22:50:27 +0100 Subject: [PATCH] Remove tailsamplingprocessor (#2012) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** as discussed in the past, we are moving the tail-based sampling processor to the contrib repository. IMPORTANT: there shouldn't be a release of the collector/contrib until the contrib counterpart is merged! **Link to tracking Issue:** Partially addresses #2020. **Testing:** regular test. **Documentation:** Removed processor from readme. Signed-off-by: Juraci Paixão Kröhling --- processor/README.md | 4 +- .../tailsamplingprocessor/README.md | 58 -- .../tailsamplingprocessor/config.go | 95 ---- .../tailsamplingprocessor/config_test.go | 72 --- .../tailsamplingprocessor/factory.go | 59 -- .../tailsamplingprocessor/factory_test.go | 52 -- .../idbatcher/id_batcher.go | 141 ----- .../idbatcher/id_batcher_test.go | 161 ------ .../tailsamplingprocessor/metrics.go | 137 ----- .../tailsamplingprocessor/processor.go | 451 --------------- .../tailsamplingprocessor/processor_test.go | 522 ------------------ .../sampling/always_sample.go | 56 -- .../sampling/always_sample_test.go | 48 -- .../tailsamplingprocessor/sampling/doc.go | 17 - .../sampling/empty_test.go | 15 - .../sampling/numeric_tag_filter.go | 92 --- .../sampling/numeric_tag_filter_test.go | 113 ---- .../tailsamplingprocessor/sampling/policy.go | 73 --- .../sampling/rate_limiting.go | 74 --- .../sampling/rate_limiting_test.go | 73 --- .../sampling/string_tag_filter.go | 112 ---- .../sampling/string_tag_filter_test.go | 114 ---- .../testdata/tail_sampling_config.yaml | 40 -- service/defaultcomponents/defaults.go | 2 - service/defaultcomponents/defaults_test.go | 1 - service/telemetry.go | 2 - 26 files changed, 1 insertion(+), 2583 deletions(-) delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/README.md delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/config.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/config_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/factory.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/factory_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/metrics.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/processor.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/processor_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/doc.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/empty_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/policy.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter_test.go delete mode 100644 processor/samplingprocessor/tailsamplingprocessor/testdata/tail_sampling_config.yaml diff --git a/processor/README.md b/processor/README.md index 74ef433c7d7..e752cfd981c 100644 --- a/processor/README.md +++ b/processor/README.md @@ -18,9 +18,7 @@ Supported processors (sorted alphabetically): - [Memory Limiter Processor](memorylimiter/README.md) - [Queued Retry Processor](queuedprocessor/README.md) - [Resource Processor](resourceprocessor/README.md) -- Sampling Processors - - [Probabilistic Sampling Processor](samplingprocessor/probabilisticsamplerprocessor/README.md) - - [Tail Sampling Processor](samplingprocessor/tailsamplingprocessor/README.md) +- [Probabilistic Sampling Processor](samplingprocessor/probabilisticsamplerprocessor/README.md) - [Span Processor](spanprocessor/README.md) The [contributors repository](https://github.com/open-telemetry/opentelemetry-collector-contrib) diff --git a/processor/samplingprocessor/tailsamplingprocessor/README.md b/processor/samplingprocessor/tailsamplingprocessor/README.md deleted file mode 100644 index b80337ebb21..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/README.md +++ /dev/null @@ -1,58 +0,0 @@ -# Tail Sampling Processor - -Supported pipeline types: traces - -The tail sampling processor samples traces based on a set of defined policies. -Today, this processor only works with a single instance of the collector. -Technically, trace ID aware load balancing could be used to support multiple -collector instances, but this configuration has not been tested. Please refer to -[config.go](./config.go) for the config spec. - -The following configuration options are required: -- `policies` (no default): Policies used to make a sampling decision - -Multiple policies exist today and it is straight forward to add more. These include: -- `always_sample`: Sample all traces -- `numeric_attribute`: Sample based on number attributes -- `string_attribute`: Sample based on string attributes -- `rate_limiting`: Sample based on rate - -The following configuration options can also be modified: -- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision -- `num_traces` (default = 50000): Number of traces kept in memory -- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures) - -Examples: - -```yaml -processors: - tail_sampling: - decision_wait: 10s - num_traces: 100 - expected_new_traces_per_sec: 10 - policies: - [ - { - name: test-policy-1, - type: always_sample - }, - { - name: test-policy-2, - type: numeric_attribute, - numeric_attribute: {key: key1, min_value: 50, max_value: 100} - }, - { - name: test-policy-3, - type: string_attribute, - string_attribute: {key: key2, values: [value1, value2]} - }, - { - name: test-policy-4, - type: rate_limiting, - rate_limiting: {spans_per_second: 35} - } - ] -``` - -Refer to [tail_sampling_config.yaml](./testdata/tail_sampling_config.yaml) for detailed -examples on using the processor. diff --git a/processor/samplingprocessor/tailsamplingprocessor/config.go b/processor/samplingprocessor/tailsamplingprocessor/config.go deleted file mode 100644 index 90edf0ac994..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/config.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "time" - - "go.opentelemetry.io/collector/config/configmodels" -) - -// PolicyType indicates the type of sampling policy. -type PolicyType string - -const ( - // AlwaysSample samples all traces, typically used for debugging. - AlwaysSample PolicyType = "always_sample" - // NumericAttribute sample traces that have a given numeric attribute in a specified - // range, e.g.: attribute "http.status_code" >= 399 and <= 999. - NumericAttribute PolicyType = "numeric_attribute" - // StringAttribute sample traces that a attribute, of type string, matching - // one of the listed values. - StringAttribute PolicyType = "string_attribute" - // RateLimiting allows all traces until the specified limits are satisfied. - RateLimiting PolicyType = "rate_limiting" -) - -// PolicyCfg holds the common configuration to all policies. -type PolicyCfg struct { - // Name given to the instance of the policy to make easy to identify it in metrics and logs. - Name string `mapstructure:"name"` - // Type of the policy this will be used to match the proper configuration of the policy. - Type PolicyType `mapstructure:"type"` - // Configs for numeric attribute filter sampling policy evaluator. - NumericAttributeCfg NumericAttributeCfg `mapstructure:"numeric_attribute"` - // Configs for string attribute filter sampling policy evaluator. - StringAttributeCfg StringAttributeCfg `mapstructure:"string_attribute"` - // Configs for rate limiting filter sampling policy evaluator. - RateLimitingCfg RateLimitingCfg `mapstructure:"rate_limiting"` -} - -// NumericAttributeCfg holds the configurable settings to create a numeric attribute filter -// sampling policy evaluator. -type NumericAttributeCfg struct { - // Tag that the filter is going to be matching against. - Key string `mapstructure:"key"` - // MinValue is the minimum value of the attribute to be considered a match. - MinValue int64 `mapstructure:"min_value"` - // MaxValue is the maximum value of the attribute to be considered a match. - MaxValue int64 `mapstructure:"max_value"` -} - -// StringAttributeCfg holds the configurable settings to create a string attribute filter -// sampling policy evaluator. -type StringAttributeCfg struct { - // Tag that the filter is going to be matching against. - Key string `mapstructure:"key"` - // Values is the set of attribute values that if any is equal to the actual attribute value to be considered a match. - Values []string `mapstructure:"values"` -} - -// RateLimitingCfg holds the configurable settings to create a rate limiting -// sampling policy evaluator. -type RateLimitingCfg struct { - // SpansPerSecond sets the limit on the maximum nuber of spans that can be processed each second. - SpansPerSecond int64 `mapstructure:"spans_per_second"` -} - -// Config holds the configuration for tail-based sampling. -type Config struct { - configmodels.ProcessorSettings `mapstructure:",squash"` - // DecisionWait is the desired wait time from the arrival of the first span of - // trace until the decision about sampling it or not is evaluated. - DecisionWait time.Duration `mapstructure:"decision_wait"` - // NumTraces is the number of traces kept on memory. Typically most of the data - // of a trace is released after a sampling decision is taken. - NumTraces uint64 `mapstructure:"num_traces"` - // ExpectedNewTracesPerSec sets the expected number of new traces sending to the tail sampling processor - // per second. This helps with allocating data structures with closer to actual usage size. - ExpectedNewTracesPerSec uint64 `mapstructure:"expected_new_traces_per_sec"` - // PolicyCfgs sets the tail-based sampling policy which makes a sampling decision - // for a given trace when requested. - PolicyCfgs []PolicyCfg `mapstructure:"policies"` -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/config_test.go b/processor/samplingprocessor/tailsamplingprocessor/config_test.go deleted file mode 100644 index 9ce741e0d54..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/config_test.go +++ /dev/null @@ -1,72 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "path" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/config/configtest" -) - -func TestLoadConfig(t *testing.T) { - factories, err := componenttest.ExampleComponents() - assert.NoError(t, err) - - factory := NewFactory() - factories.Processors[factory.Type()] = factory - - cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "tail_sampling_config.yaml"), factories) - require.Nil(t, err) - require.NotNil(t, cfg) - - assert.Equal(t, cfg.Processors["tail_sampling"], - &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: "tail_sampling", - NameVal: "tail_sampling", - }, - DecisionWait: 10 * time.Second, - NumTraces: 100, - ExpectedNewTracesPerSec: 10, - PolicyCfgs: []PolicyCfg{ - { - Name: "test-policy-1", - Type: AlwaysSample, - }, - { - Name: "test-policy-2", - Type: NumericAttribute, - NumericAttributeCfg: NumericAttributeCfg{Key: "key1", MinValue: 50, MaxValue: 100}, - }, - { - Name: "test-policy-3", - Type: StringAttribute, - StringAttributeCfg: StringAttributeCfg{Key: "key2", Values: []string{"value1", "value2"}}, - }, - { - Name: "test-policy-4", - Type: RateLimiting, - RateLimitingCfg: RateLimitingCfg{SpansPerSecond: 35}, - }, - }, - }) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/factory.go b/processor/samplingprocessor/tailsamplingprocessor/factory.go deleted file mode 100644 index 5de7ba0f0b1..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/factory.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "context" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/processor/processorhelper" -) - -const ( - // The value of "type" Tail Sampling in configuration. - typeStr = "tail_sampling" -) - -// NewFactory returns a new factory for the Tail Sampling processor. -func NewFactory() component.ProcessorFactory { - return processorhelper.NewFactory( - typeStr, - createDefaultConfig, - processorhelper.WithTraces(createTraceProcessor)) -} - -func createDefaultConfig() configmodels.Processor { - return &Config{ - ProcessorSettings: configmodels.ProcessorSettings{ - TypeVal: typeStr, - NameVal: typeStr, - }, - DecisionWait: 30 * time.Second, - NumTraces: 50000, - } -} - -func createTraceProcessor( - _ context.Context, - params component.ProcessorCreateParams, - cfg configmodels.Processor, - nextConsumer consumer.TracesConsumer, -) (component.TraceProcessor, error) { - tCfg := cfg.(*Config) - return newTraceProcessor(params.Logger, nextConsumer, *tCfg) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/factory_test.go b/processor/samplingprocessor/tailsamplingprocessor/factory_test.go deleted file mode 100644 index d3cbc6d502b..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/factory_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configcheck" - "go.opentelemetry.io/collector/consumer/consumertest" -) - -func TestCreateDefaultConfig(t *testing.T) { - cfg := createDefaultConfig() - assert.NotNil(t, cfg, "failed to create default config") - assert.NoError(t, configcheck.ValidateConfig(cfg)) -} - -func TestCreateProcessor(t *testing.T) { - factory := NewFactory() - - cfg := factory.CreateDefaultConfig().(*Config) - // Manually set required fields - cfg.ExpectedNewTracesPerSec = 64 - cfg.PolicyCfgs = []PolicyCfg{ - { - Name: "test-policy", - Type: AlwaysSample, - }, - } - - params := component.ProcessorCreateParams{Logger: zap.NewNop()} - tp, err := factory.CreateTraceProcessor(context.Background(), params, cfg, consumertest.NewTracesNop()) - assert.NotNil(t, tp) - assert.NoError(t, err, "cannot create trace processor") -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher.go b/processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher.go deleted file mode 100644 index d18baaaa264..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package idbatcher defines a pipeline of fixed size in which the -// elements are batches of ids. -package idbatcher - -import ( - "errors" - "sync" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -var ( - // ErrInvalidNumBatches occurs when an invalid number of batches is specified. - ErrInvalidNumBatches = errors.New("invalid number of batches, it must be greater than zero") - // ErrInvalidBatchChannelSize occurs when an invalid batch channel size is specified. - ErrInvalidBatchChannelSize = errors.New("invalid batch channel size, it must be greater than zero") -) - -// Batch is the type of batches held by the Batcher. -type Batch []pdata.TraceID - -// Batcher behaves like a pipeline of batches that has a fixed number of batches in the pipe -// and a new batch being built outside of the pipe. Items can be concurrently added to the batch -// currently being built. When the batch being built is closed, the oldest batch in the pipe -// is pushed out so the one just closed can be put on the end of the pipe (this is done as an -// atomic operation). The caller is in control of when a batch is completed and a new one should -// be started. -type Batcher interface { - // AddToCurrentBatch puts the given id on the batch being currently built. The client is in charge - // of limiting the growth of the current batch if appropriate for its scenario. It can - // either call CloseCurrentAndTakeFirstBatch earlier or stop adding new items depending on what is - // required by the scenario. - AddToCurrentBatch(id pdata.TraceID) - // CloseCurrentAndTakeFirstBatch takes the batch at the front of the pipe, and moves the current - // batch to the end of the pipe, creating a new batch to receive new items. This operation should - // be atomic. - // It returns the batch that was in front of the pipe and a boolean that if true indicates that - // there are more batches to be retrieved. - CloseCurrentAndTakeFirstBatch() (Batch, bool) - // Stop informs that no more items are going to be batched and the pipeline can be read until it - // is empty. After this method is called attempts to enqueue new items will panic. - Stop() -} - -var _ Batcher = (*batcher)(nil) - -type batcher struct { - pendingIds chan pdata.TraceID // Channel for the ids to be added to the next batch. - batches chan Batch // Channel with already captured batches. - - // cbMutex protects the currentBatch storing ids. - cbMutex sync.Mutex - currentBatch Batch - - newBatchesInitialCapacity uint64 - stopchan chan bool - stopped bool -} - -// New creates a Batcher that will hold numBatches in its pipeline, having a channel with -// batchChannelSize to receive new items. New batches will be created with capacity set to -// newBatchesInitialCapacity. -func New(numBatches, newBatchesInitialCapacity, batchChannelSize uint64) (Batcher, error) { - if numBatches < 1 { - return nil, ErrInvalidNumBatches - } - if batchChannelSize < 1 { - return nil, ErrInvalidBatchChannelSize - } - - batches := make(chan Batch, numBatches) - // First numBatches batches will be empty in order to simplify clients that are running - // CloseCurrentAndTakeFirstBatch on a timer and want to delay the processing of the first - // batch with actual data. This way there is no need for accounting on the client side and - // a single timer can be started immediately. - for i := uint64(0); i < numBatches; i++ { - batches <- nil - } - - batcher := &batcher{ - pendingIds: make(chan pdata.TraceID, batchChannelSize), - batches: batches, - currentBatch: make(Batch, 0, newBatchesInitialCapacity), - newBatchesInitialCapacity: newBatchesInitialCapacity, - stopchan: make(chan bool), - } - - // Single goroutine that keeps filling the current batch, contention is expected only - // when the current batch is being switched. - go func() { - for id := range batcher.pendingIds { - batcher.cbMutex.Lock() - batcher.currentBatch = append(batcher.currentBatch, id) - batcher.cbMutex.Unlock() - } - batcher.stopchan <- true - }() - - return batcher, nil -} - -func (b *batcher) AddToCurrentBatch(id pdata.TraceID) { - b.pendingIds <- id -} - -func (b *batcher) CloseCurrentAndTakeFirstBatch() (Batch, bool) { - if readBatch, ok := <-b.batches; ok { - if !b.stopped { - nextBatch := make(Batch, 0, b.newBatchesInitialCapacity) - b.cbMutex.Lock() - b.batches <- b.currentBatch - b.currentBatch = nextBatch - b.cbMutex.Unlock() - } - return readBatch, true - } - - readBatch := b.currentBatch - b.currentBatch = nil - return readBatch, false -} - -func (b *batcher) Stop() { - close(b.pendingIds) - b.stopped = <-b.stopchan - close(b.batches) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher_test.go b/processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher_test.go deleted file mode 100644 index faf029824e1..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/idbatcher/id_batcher_test.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package idbatcher - -import ( - "runtime" - "sync" - "sync/atomic" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/consumer/pdata" - tracetranslator "go.opentelemetry.io/collector/translator/trace" -) - -func TestBatcherNew(t *testing.T) { - tests := []struct { - name string - numBatches uint64 - newBatchesInitialCapacity uint64 - batchChannelSize uint64 - wantErr error - }{ - {"invalid numBatches", 0, 0, 1, ErrInvalidNumBatches}, - {"invalid batchChannelSize", 1, 0, 0, ErrInvalidBatchChannelSize}, - {"valid", 1, 0, 1, nil}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := New(tt.numBatches, tt.newBatchesInitialCapacity, tt.batchChannelSize) - if err != tt.wantErr { - t.Errorf("New() error = %v, wantErr %v", err, tt.wantErr) - return - } - if got != nil { - got.Stop() - } - }) - } -} - -func TestTypicalConfig(t *testing.T) { - concurrencyTest(t, 10, 100, uint64(4*runtime.NumCPU())) -} - -func TestMinBufferedChannels(t *testing.T) { - concurrencyTest(t, 1, 0, 1) -} - -func BenchmarkConcurrentEnqueue(b *testing.B) { - ids := generateSequentialIds(1) - batcher, err := New(10, 100, uint64(4*runtime.NumCPU())) - defer batcher.Stop() - if err != nil { - b.Fatalf("Failed to create Batcher: %v", err) - } - - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - var ticked int32 - var received int32 - go func() { - for range ticker.C { - batch, _ := batcher.CloseCurrentAndTakeFirstBatch() - atomic.AddInt32(&ticked, 1) - atomic.AddInt32(&received, int32(len(batch))) - } - }() - - b.ReportAllocs() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - batcher.AddToCurrentBatch(ids[0]) - } - }) -} - -func concurrencyTest(t *testing.T, numBatches, newBatchesInitialCapacity, batchChannelSize uint64) { - batcher, err := New(numBatches, newBatchesInitialCapacity, batchChannelSize) - require.NoError(t, err, "Failed to create Batcher: %v", err) - - ticker := time.NewTicker(100 * time.Millisecond) - stopTicker := make(chan bool) - var got Batch - go func() { - var completedDequeues uint64 - outer: - for { - select { - case <-ticker.C: - g, _ := batcher.CloseCurrentAndTakeFirstBatch() - completedDequeues++ - if completedDequeues <= numBatches && len(g) != 0 { - t.Error("Some of the first batches were not empty") - return - } - got = append(got, g...) - case <-stopTicker: - break outer - } - } - }() - - ids := generateSequentialIds(10000) - wg := &sync.WaitGroup{} - for i := 0; i < len(ids); i++ { - wg.Add(1) - go func(id pdata.TraceID) { - batcher.AddToCurrentBatch(id) - wg.Done() - }(ids[i]) - } - - wg.Wait() - stopTicker <- true - ticker.Stop() - batcher.Stop() - - // Get all ids added to the batcher - for { - batch, ok := batcher.CloseCurrentAndTakeFirstBatch() - got = append(got, batch...) - if !ok { - break - } - } - - require.Equal(t, len(ids), len(got), "Batcher got incorrect count of traces from batches") - - idSeen := make(map[[16]byte]bool, len(ids)) - for _, id := range got { - idSeen[id.Bytes()] = true - } - - for i := 0; i < len(ids); i++ { - require.True(t, idSeen[ids[i].Bytes()], "want id %v but id was not seen", ids[i]) - } -} - -func generateSequentialIds(numIds uint64) []pdata.TraceID { - ids := make([]pdata.TraceID, numIds) - for i := uint64(0); i < numIds; i++ { - ids[i] = tracetranslator.UInt64ToTraceID(0, i) - } - return ids -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/metrics.go b/processor/samplingprocessor/tailsamplingprocessor/metrics.go deleted file mode 100644 index 20cf77539bc..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/metrics.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/obsreport" -) - -// Variables related to metrics specific to tail sampling. -var ( - tagPolicyKey, _ = tag.NewKey("policy") - tagSampledKey, _ = tag.NewKey("sampled") - tagSourceFormat, _ = tag.NewKey("source_format") - - statDecisionLatencyMicroSec = stats.Int64("sampling_decision_latency", "Latency (in microseconds) of a given sampling policy", "µs") - statOverallDecisionLatencyµs = stats.Int64("sampling_decision_timer_latency", "Latency (in microseconds) of each run of the sampling decision timer", "µs") - - statTraceRemovalAgeSec = stats.Int64("sampling_trace_removal_age", "Time (in seconds) from arrival of a new trace until its removal from memory", "s") - statLateSpanArrivalAfterDecision = stats.Int64("sampling_late_span_age", "Time (in seconds) from the sampling decision was taken and the arrival of a late span", "s") - - statPolicyEvaluationErrorCount = stats.Int64("sampling_policy_evaluation_error", "Count of sampling policy evaluation errors", stats.UnitDimensionless) - - statCountTracesSampled = stats.Int64("count_traces_sampled", "Count of traces that were sampled or not", stats.UnitDimensionless) - - statDroppedTooEarlyCount = stats.Int64("sampling_trace_dropped_too_early", "Count of traces that needed to be dropped the configured wait time", stats.UnitDimensionless) - statNewTraceIDReceivedCount = stats.Int64("new_trace_id_received", "Counts the arrival of new traces", stats.UnitDimensionless) - statTracesOnMemoryGauge = stats.Int64("sampling_traces_on_memory", "Tracks the number of traces current on memory", stats.UnitDimensionless) -) - -// SamplingProcessorMetricViews return the metrics views according to given telemetry level. -func SamplingProcessorMetricViews(level configtelemetry.Level) []*view.View { - if level == configtelemetry.LevelNone { - return nil - } - - policyTagKeys := []tag.Key{tagPolicyKey} - - latencyDistributionAggregation := view.Distribution(1, 2, 5, 10, 25, 50, 75, 100, 150, 200, 300, 400, 500, 750, 1000, 2000, 3000, 4000, 5000, 10000, 20000, 30000, 50000) - ageDistributionAggregation := view.Distribution(1, 2, 5, 10, 20, 30, 40, 50, 60, 90, 120, 180, 300, 600, 1800, 3600, 7200) - - decisionLatencyView := &view.View{ - Name: statDecisionLatencyMicroSec.Name(), - Measure: statDecisionLatencyMicroSec, - Description: statDecisionLatencyMicroSec.Description(), - TagKeys: policyTagKeys, - Aggregation: latencyDistributionAggregation, - } - overallDecisionLatencyView := &view.View{ - Name: statOverallDecisionLatencyµs.Name(), - Measure: statOverallDecisionLatencyµs, - Description: statOverallDecisionLatencyµs.Description(), - Aggregation: latencyDistributionAggregation, - } - - traceRemovalAgeView := &view.View{ - Name: statTraceRemovalAgeSec.Name(), - Measure: statTraceRemovalAgeSec, - Description: statTraceRemovalAgeSec.Description(), - Aggregation: ageDistributionAggregation, - } - lateSpanArrivalView := &view.View{ - Name: statLateSpanArrivalAfterDecision.Name(), - Measure: statLateSpanArrivalAfterDecision, - Description: statLateSpanArrivalAfterDecision.Description(), - Aggregation: ageDistributionAggregation, - } - - countPolicyEvaluationErrorView := &view.View{ - Name: statPolicyEvaluationErrorCount.Name(), - Measure: statPolicyEvaluationErrorCount, - Description: statPolicyEvaluationErrorCount.Description(), - Aggregation: view.Sum(), - } - - sampledTagKeys := []tag.Key{tagPolicyKey, tagSampledKey} - countTracesSampledView := &view.View{ - Name: statCountTracesSampled.Name(), - Measure: statCountTracesSampled, - Description: statCountTracesSampled.Description(), - TagKeys: sampledTagKeys, - Aggregation: view.Sum(), - } - - countTraceDroppedTooEarlyView := &view.View{ - Name: statDroppedTooEarlyCount.Name(), - Measure: statDroppedTooEarlyCount, - Description: statDroppedTooEarlyCount.Description(), - Aggregation: view.Sum(), - } - countTraceIDArrivalView := &view.View{ - Name: statNewTraceIDReceivedCount.Name(), - Measure: statNewTraceIDReceivedCount, - Description: statNewTraceIDReceivedCount.Description(), - Aggregation: view.Sum(), - } - trackTracesOnMemorylView := &view.View{ - Name: statTracesOnMemoryGauge.Name(), - Measure: statTracesOnMemoryGauge, - Description: statTracesOnMemoryGauge.Description(), - Aggregation: view.LastValue(), - } - - legacyViews := []*view.View{ - decisionLatencyView, - overallDecisionLatencyView, - - traceRemovalAgeView, - lateSpanArrivalView, - - countPolicyEvaluationErrorView, - - countTracesSampledView, - - countTraceDroppedTooEarlyView, - countTraceIDArrivalView, - trackTracesOnMemorylView, - } - - return obsreport.ProcessorMetricViews(typeStr, legacyViews) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/processor.go b/processor/samplingprocessor/tailsamplingprocessor/processor.go deleted file mode 100644 index 534113a7afe..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/processor.go +++ /dev/null @@ -1,451 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "context" - "fmt" - "runtime" - "sync" - "sync/atomic" - "time" - - "go.opencensus.io/stats" - "go.opencensus.io/tag" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenterror" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor/idbatcher" - "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor/sampling" -) - -// Policy combines a sampling policy evaluator with the destinations to be -// used for that policy. -type Policy struct { - // Name used to identify this policy instance. - Name string - // Evaluator that decides if a trace is sampled or not by this policy instance. - Evaluator sampling.PolicyEvaluator - // ctx used to carry metric tags of each policy. - ctx context.Context -} - -// traceKey is defined since sync.Map requires a comparable type, isolating it on its own -// type to help track usage. -type traceKey [16]byte - -// tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling -// policy to sample traces. -type tailSamplingSpanProcessor struct { - ctx context.Context - nextConsumer consumer.TracesConsumer - start sync.Once - maxNumTraces uint64 - policies []*Policy - logger *zap.Logger - idToTrace sync.Map - policyTicker tTicker - decisionBatcher idbatcher.Batcher - deleteChan chan traceKey - numTracesOnMap uint64 -} - -const ( - sourceFormat = "tail_sampling" -) - -// newTraceProcessor returns a processor.TraceProcessor that will perform tail sampling according to the given -// configuration. -func newTraceProcessor(logger *zap.Logger, nextConsumer consumer.TracesConsumer, cfg Config) (component.TraceProcessor, error) { - if nextConsumer == nil { - return nil, componenterror.ErrNilNextConsumer - } - - numDecisionBatches := uint64(cfg.DecisionWait.Seconds()) - inBatcher, err := idbatcher.New(numDecisionBatches, cfg.ExpectedNewTracesPerSec, uint64(2*runtime.NumCPU())) - if err != nil { - return nil, err - } - - ctx := context.Background() - var policies []*Policy - for i := range cfg.PolicyCfgs { - policyCfg := &cfg.PolicyCfgs[i] - policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, policyCfg.Name), tag.Upsert(tagSourceFormat, sourceFormat)) - if err != nil { - return nil, err - } - eval, err := getPolicyEvaluator(logger, policyCfg) - if err != nil { - return nil, err - } - policy := &Policy{ - Name: policyCfg.Name, - Evaluator: eval, - ctx: policyCtx, - } - policies = append(policies, policy) - } - - tsp := &tailSamplingSpanProcessor{ - ctx: ctx, - nextConsumer: nextConsumer, - maxNumTraces: cfg.NumTraces, - logger: logger, - decisionBatcher: inBatcher, - policies: policies, - } - - tsp.policyTicker = &policyTicker{onTick: tsp.samplingPolicyOnTick} - tsp.deleteChan = make(chan traceKey, cfg.NumTraces) - - return tsp, nil -} - -func getPolicyEvaluator(logger *zap.Logger, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) { - switch cfg.Type { - case AlwaysSample: - return sampling.NewAlwaysSample(logger), nil - case NumericAttribute: - nafCfg := cfg.NumericAttributeCfg - return sampling.NewNumericAttributeFilter(logger, nafCfg.Key, nafCfg.MinValue, nafCfg.MaxValue), nil - case StringAttribute: - safCfg := cfg.StringAttributeCfg - return sampling.NewStringAttributeFilter(logger, safCfg.Key, safCfg.Values), nil - case RateLimiting: - rlfCfg := cfg.RateLimitingCfg - return sampling.NewRateLimiting(logger, rlfCfg.SpansPerSecond), nil - default: - return nil, fmt.Errorf("unknown sampling policy type %s", cfg.Type) - } -} - -type policyMetrics struct { - idNotFoundOnMapCount, evaluateErrorCount, decisionSampled, decisionNotSampled int64 -} - -func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { - metrics := policyMetrics{} - - startTime := time.Now() - batch, _ := tsp.decisionBatcher.CloseCurrentAndTakeFirstBatch() - batchLen := len(batch) - tsp.logger.Debug("Sampling Policy Evaluation ticked") - for _, id := range batch { - d, ok := tsp.idToTrace.Load(traceKey(id.Bytes())) - if !ok { - metrics.idNotFoundOnMapCount++ - continue - } - trace := d.(*sampling.TraceData) - trace.DecisionTime = time.Now() - - decision, policy := tsp.makeDecision(id, trace, &metrics) - - // Sampled or not, remove the batches - trace.Lock() - traceBatches := trace.ReceivedBatches - trace.ReceivedBatches = nil - trace.Unlock() - - if decision == sampling.Sampled { - - // Combine all individual batches into a single batch so - // consumers may operate on the entire trace - allSpans := pdata.NewTraces() - for j := 0; j < len(traceBatches); j++ { - batch := traceBatches[j] - batch.ResourceSpans().MoveAndAppendTo(allSpans.ResourceSpans()) - } - - _ = tsp.nextConsumer.ConsumeTraces(policy.ctx, allSpans) - } - } - - stats.Record(tsp.ctx, - statOverallDecisionLatencyµs.M(int64(time.Since(startTime)/time.Microsecond)), - statDroppedTooEarlyCount.M(metrics.idNotFoundOnMapCount), - statPolicyEvaluationErrorCount.M(metrics.evaluateErrorCount), - statTracesOnMemoryGauge.M(int64(atomic.LoadUint64(&tsp.numTracesOnMap)))) - - tsp.logger.Debug("Sampling policy evaluation completed", - zap.Int("batch.len", batchLen), - zap.Int64("sampled", metrics.decisionSampled), - zap.Int64("notSampled", metrics.decisionNotSampled), - zap.Int64("droppedPriorToEvaluation", metrics.idNotFoundOnMapCount), - zap.Int64("policyEvaluationErrors", metrics.evaluateErrorCount), - ) -} - -func (tsp *tailSamplingSpanProcessor) makeDecision(id pdata.TraceID, trace *sampling.TraceData, metrics *policyMetrics) (sampling.Decision, *Policy) { - finalDecision := sampling.NotSampled - var matchingPolicy *Policy = nil - - for i, policy := range tsp.policies { - policyEvaluateStartTime := time.Now() - decision, err := policy.Evaluator.Evaluate(id, trace) - stats.Record( - policy.ctx, - statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond))) - - if err != nil { - trace.Decisions[i] = sampling.NotSampled - metrics.evaluateErrorCount++ - tsp.logger.Debug("Sampling policy error", zap.Error(err)) - } else { - trace.Decisions[i] = decision - - switch decision { - case sampling.Sampled: - // any single policy that decides to sample will cause the decision to be sampled - // the nextConsumer will get the context from the first matching policy - finalDecision = sampling.Sampled - if matchingPolicy == nil { - matchingPolicy = policy - } - - _ = stats.RecordWithTags( - policy.ctx, - []tag.Mutator{tag.Insert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - metrics.decisionSampled++ - - case sampling.NotSampled: - _ = stats.RecordWithTags( - policy.ctx, - []tag.Mutator{tag.Insert(tagSampledKey, "false")}, - statCountTracesSampled.M(int64(1)), - ) - metrics.decisionNotSampled++ - } - } - } - - return finalDecision, matchingPolicy -} - -// ConsumeTraceData is required by the SpanProcessor interface. -func (tsp *tailSamplingSpanProcessor) ConsumeTraces(ctx context.Context, td pdata.Traces) error { - tsp.start.Do(func() { - tsp.logger.Info("First trace data arrived, starting tail_sampling timers") - tsp.policyTicker.Start(1 * time.Second) - }) - resourceSpans := td.ResourceSpans() - for i := 0; i < resourceSpans.Len(); i++ { - resourceSpan := resourceSpans.At(i) - if resourceSpan.IsNil() { - continue - } - tsp.processTraces(resourceSpan) - } - return nil -} - -func (tsp *tailSamplingSpanProcessor) groupSpansByTraceKey(resourceSpans pdata.ResourceSpans) map[traceKey][]*pdata.Span { - idToSpans := make(map[traceKey][]*pdata.Span) - ilss := resourceSpans.InstrumentationLibrarySpans() - for j := 0; j < ilss.Len(); j++ { - ils := ilss.At(j) - if ils.IsNil() { - continue - } - spansLen := ils.Spans().Len() - for k := 0; k < spansLen; k++ { - span := ils.Spans().At(k) - tk := traceKey(span.TraceID().Bytes()) - if len(tk) != 16 { - tsp.logger.Warn("Span without valid TraceId") - } - idToSpans[tk] = append(idToSpans[tk], &span) - } - } - return idToSpans -} - -func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans pdata.ResourceSpans) { - // Group spans per their traceId to minimize contention on idToTrace - idToSpans := tsp.groupSpansByTraceKey(resourceSpans) - var newTraceIDs int64 - for id, spans := range idToSpans { - lenSpans := int64(len(spans)) - lenPolicies := len(tsp.policies) - initialDecisions := make([]sampling.Decision, lenPolicies) - for i := 0; i < lenPolicies; i++ { - initialDecisions[i] = sampling.Pending - } - initialTraceData := &sampling.TraceData{ - Decisions: initialDecisions, - ArrivalTime: time.Now(), - SpanCount: lenSpans, - } - d, loaded := tsp.idToTrace.LoadOrStore(id, initialTraceData) - - actualData := d.(*sampling.TraceData) - if loaded { - atomic.AddInt64(&actualData.SpanCount, lenSpans) - } else { - newTraceIDs++ - tsp.decisionBatcher.AddToCurrentBatch(pdata.NewTraceID(id)) - atomic.AddUint64(&tsp.numTracesOnMap, 1) - postDeletion := false - currTime := time.Now() - for !postDeletion { - select { - case tsp.deleteChan <- id: - postDeletion = true - default: - traceKeyToDrop := <-tsp.deleteChan - tsp.dropTrace(traceKeyToDrop, currTime) - } - } - } - - for i, policy := range tsp.policies { - var traceTd pdata.Traces - actualData.Lock() - actualDecision := actualData.Decisions[i] - // If decision is pending, we want to add the new spans still under the lock, so the decision doesn't happen - // in between the transition from pending. - if actualDecision == sampling.Pending { - // Add the spans to the trace, but only once for all policy, otherwise same spans will - // be duplicated in the final trace. - traceTd = prepareTraceBatch(resourceSpans, spans) - actualData.ReceivedBatches = append(actualData.ReceivedBatches, traceTd) - actualData.Unlock() - break - } - actualData.Unlock() - - switch actualDecision { - case sampling.Sampled: - // Forward the spans to the policy destinations - traceTd := prepareTraceBatch(resourceSpans, spans) - if err := tsp.nextConsumer.ConsumeTraces(policy.ctx, traceTd); err != nil { - tsp.logger.Warn("Error sending late arrived spans to destination", - zap.String("policy", policy.Name), - zap.Error(err)) - } - fallthrough // so OnLateArrivingSpans is also called for decision Sampled. - case sampling.NotSampled: - policy.Evaluator.OnLateArrivingSpans(actualDecision, spans) - stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second))) - - default: - tsp.logger.Warn("Encountered unexpected sampling decision", - zap.String("policy", policy.Name), - zap.Int("decision", int(actualDecision))) - } - - // At this point the late arrival has been passed to nextConsumer. Need to break out of the policy loop - // so that it isn't sent to nextConsumer more than once when multiple policies chose to sample - if actualDecision == sampling.Sampled { - break - } - } - } - - stats.Record(tsp.ctx, statNewTraceIDReceivedCount.M(newTraceIDs)) -} - -func (tsp *tailSamplingSpanProcessor) GetCapabilities() component.ProcessorCapabilities { - return component.ProcessorCapabilities{MutatesConsumedData: false} -} - -// Start is invoked during service startup. -func (tsp *tailSamplingSpanProcessor) Start(context.Context, component.Host) error { - return nil -} - -// Shutdown is invoked during service shutdown. -func (tsp *tailSamplingSpanProcessor) Shutdown(context.Context) error { - return nil -} - -func (tsp *tailSamplingSpanProcessor) dropTrace(traceID traceKey, deletionTime time.Time) { - var trace *sampling.TraceData - if d, ok := tsp.idToTrace.Load(traceID); ok { - trace = d.(*sampling.TraceData) - tsp.idToTrace.Delete(traceID) - // Subtract one from numTracesOnMap per https://godoc.org/sync/atomic#AddUint64 - atomic.AddUint64(&tsp.numTracesOnMap, ^uint64(0)) - } - if trace == nil { - tsp.logger.Error("Attempt to delete traceID not on table") - return - } - policiesLen := len(tsp.policies) - stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))) - for j := 0; j < policiesLen; j++ { - if trace.Decisions[j] == sampling.Pending { - policy := tsp.policies[j] - if decision, err := policy.Evaluator.OnDroppedSpans(pdata.NewTraceID(traceID), trace); err != nil { - tsp.logger.Warn("OnDroppedSpans", - zap.String("policy", policy.Name), - zap.Int("decision", int(decision)), - zap.Error(err)) - } - } - } -} - -func prepareTraceBatch(rss pdata.ResourceSpans, spans []*pdata.Span) pdata.Traces { - traceTd := pdata.NewTraces() - traceTd.ResourceSpans().Resize(1) - rs := traceTd.ResourceSpans().At(0) - rs.Resource().InitEmpty() - rss.Resource().CopyTo(rs.Resource()) - rs.InstrumentationLibrarySpans().Resize(1) - ils := rs.InstrumentationLibrarySpans().At(0) - for _, span := range spans { - ils.Spans().Append(*span) - } - return traceTd -} - -// tTicker interface allows easier testing of ticker related functionality used by tailSamplingProcessor -type tTicker interface { - // Start sets the frequency of the ticker and starts the periodic calls to OnTick. - Start(d time.Duration) - // OnTick is called when the ticker fires. - OnTick() - // Stops firing the ticker. - Stop() -} - -type policyTicker struct { - ticker *time.Ticker - onTick func() -} - -func (pt *policyTicker) Start(d time.Duration) { - pt.ticker = time.NewTicker(d) - go func() { - for range pt.ticker.C { - pt.OnTick() - } - }() -} -func (pt *policyTicker) OnTick() { - pt.onTick() -} -func (pt *policyTicker) Stop() { - pt.ticker.Stop() -} - -var _ tTicker = (*policyTicker)(nil) diff --git a/processor/samplingprocessor/tailsamplingprocessor/processor_test.go b/processor/samplingprocessor/tailsamplingprocessor/processor_test.go deleted file mode 100644 index 3dba4d6dad0..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/processor_test.go +++ /dev/null @@ -1,522 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tailsamplingprocessor - -import ( - "context" - "errors" - "sort" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/internal/data/testdata" - "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor/idbatcher" - "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor/sampling" - tracetranslator "go.opentelemetry.io/collector/translator/trace" -) - -const ( - defaultTestDecisionWait = 30 * time.Second -) - -var testPolicy = []PolicyCfg{{Name: "test-policy", Type: AlwaysSample}} - -func TestSequentialTraceArrival(t *testing.T) { - traceIds, batches := generateIdsAndBatches(128) - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: uint64(2 * len(traceIds)), - ExpectedNewTracesPerSec: 64, - PolicyCfgs: testPolicy, - } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewTracesNop(), cfg) - tsp := sp.(*tailSamplingSpanProcessor) - for _, batch := range batches { - tsp.ConsumeTraces(context.Background(), batch) - } - - for i := range traceIds { - d, ok := tsp.idToTrace.Load(traceKey(traceIds[i].Bytes())) - require.True(t, ok, "Missing expected traceId") - v := d.(*sampling.TraceData) - require.Equal(t, int64(i+1), v.SpanCount, "Incorrect number of spans for entry %d", i) - } -} - -func TestConcurrentTraceArrival(t *testing.T) { - traceIds, batches := generateIdsAndBatches(128) - - var wg sync.WaitGroup - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: uint64(2 * len(traceIds)), - ExpectedNewTracesPerSec: 64, - PolicyCfgs: testPolicy, - } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewTracesNop(), cfg) - tsp := sp.(*tailSamplingSpanProcessor) - for _, batch := range batches { - // Add the same traceId twice. - wg.Add(2) - go func(td pdata.Traces) { - tsp.ConsumeTraces(context.Background(), td) - wg.Done() - }(batch) - go func(td pdata.Traces) { - tsp.ConsumeTraces(context.Background(), td) - wg.Done() - }(batch) - } - - wg.Wait() - - for i := range traceIds { - d, ok := tsp.idToTrace.Load(traceKey(traceIds[i].Bytes())) - require.True(t, ok, "Missing expected traceId") - v := d.(*sampling.TraceData) - require.Equal(t, int64(i+1)*2, v.SpanCount, "Incorrect number of spans for entry %d", i) - } -} - -func TestSequentialTraceMapSize(t *testing.T) { - traceIds, batches := generateIdsAndBatches(210) - const maxSize = 100 - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: uint64(maxSize), - ExpectedNewTracesPerSec: 64, - PolicyCfgs: testPolicy, - } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewTracesNop(), cfg) - tsp := sp.(*tailSamplingSpanProcessor) - for _, batch := range batches { - tsp.ConsumeTraces(context.Background(), batch) - } - - // On sequential insertion it is possible to know exactly which traces should be still on the map. - for i := 0; i < len(traceIds)-maxSize; i++ { - _, ok := tsp.idToTrace.Load(traceKey(traceIds[i].Bytes())) - require.False(t, ok, "Found unexpected traceId[%d] still on map (id: %v)", i, traceIds[i]) - } -} - -func TestConcurrentTraceMapSize(t *testing.T) { - _, batches := generateIdsAndBatches(210) - const maxSize = 100 - var wg sync.WaitGroup - cfg := Config{ - DecisionWait: defaultTestDecisionWait, - NumTraces: uint64(maxSize), - ExpectedNewTracesPerSec: 64, - PolicyCfgs: testPolicy, - } - sp, _ := newTraceProcessor(zap.NewNop(), consumertest.NewTracesNop(), cfg) - tsp := sp.(*tailSamplingSpanProcessor) - for _, batch := range batches { - wg.Add(1) - go func(td pdata.Traces) { - tsp.ConsumeTraces(context.Background(), td) - wg.Done() - }(batch) - } - - wg.Wait() - - // Since we can't guarantee the order of insertion the only thing that can be checked is - // if the number of traces on the map matches the expected value. - cnt := 0 - tsp.idToTrace.Range(func(_ interface{}, _ interface{}) bool { - cnt++ - return true - }) - require.Equal(t, maxSize, cnt, "Incorrect traces count on idToTrace") -} - -func TestSamplingPolicyTypicalPath(t *testing.T) { - const maxSize = 100 - const decisionWaitSeconds = 5 - // For this test explicitly control the timer calls and batcher, and set a mock - // sampling policy evaluator. - msp := new(consumertest.TracesSink) - mpe := &mockPolicyEvaluator{} - mtt := &manualTTicker{} - tsp := &tailSamplingSpanProcessor{ - ctx: context.Background(), - nextConsumer: msp, - maxNumTraces: maxSize, - logger: zap.NewNop(), - decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), - policies: []*Policy{{Name: "mock-policy", Evaluator: mpe, ctx: context.TODO()}}, - deleteChan: make(chan traceKey, maxSize), - policyTicker: mtt, - } - - _, batches := generateIdsAndBatches(210) - currItem := 0 - numSpansPerBatchWindow := 10 - // First evaluations shouldn't have anything to evaluate, until decision wait time passed. - for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ { - for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ { - tsp.ConsumeTraces(context.Background(), batches[currItem]) - require.True(t, mtt.Started, "Time ticker was expected to have started") - } - tsp.samplingPolicyOnTick() - require.False( - t, - msp.SpansCount() != 0 || mpe.EvaluationCount != 0, - "policy for initial items was evaluated before decision wait period", - ) - } - - // Now the first batch that waited the decision period. - mpe.NextDecision = sampling.Sampled - tsp.samplingPolicyOnTick() - require.False( - t, - msp.SpansCount() == 0 || mpe.EvaluationCount == 0, - "policy should have been evaluated totalspans == %d and evaluationcount == %d", - msp.SpansCount(), - mpe.EvaluationCount, - ) - - require.Equal(t, numSpansPerBatchWindow, msp.SpansCount(), "not all spans of first window were accounted for") - - // Late span of a sampled trace should be sent directly down the pipeline exporter - tsp.ConsumeTraces(context.Background(), batches[0]) - expectedNumWithLateSpan := numSpansPerBatchWindow + 1 - require.Equal(t, expectedNumWithLateSpan, msp.SpansCount(), "late span was not accounted for") - require.Equal(t, 1, mpe.LateArrivingSpansCount, "policy was not notified of the late span") -} - -func TestSamplingMultiplePolicies(t *testing.T) { - const maxSize = 100 - const decisionWaitSeconds = 5 - // For this test explicitly control the timer calls and batcher, and set a mock - // sampling policy evaluator. - msp := new(consumertest.TracesSink) - mpe1 := &mockPolicyEvaluator{} - mpe2 := &mockPolicyEvaluator{} - mtt := &manualTTicker{} - tsp := &tailSamplingSpanProcessor{ - ctx: context.Background(), - nextConsumer: msp, - maxNumTraces: maxSize, - logger: zap.NewNop(), - decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), - policies: []*Policy{ - { - Name: "policy-1", Evaluator: mpe1, ctx: context.TODO(), - }, - { - Name: "policy-2", Evaluator: mpe2, ctx: context.TODO(), - }}, - deleteChan: make(chan traceKey, maxSize), - policyTicker: mtt, - } - - _, batches := generateIdsAndBatches(210) - currItem := 0 - numSpansPerBatchWindow := 10 - // First evaluations shouldn't have anything to evaluate, until decision wait time passed. - for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ { - for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ { - tsp.ConsumeTraces(context.Background(), batches[currItem]) - require.True(t, mtt.Started, "Time ticker was expected to have started") - } - tsp.samplingPolicyOnTick() - require.False( - t, - msp.SpansCount() != 0 || mpe1.EvaluationCount != 0 || mpe2.EvaluationCount != 0, - "policy for initial items was evaluated before decision wait period", - ) - } - - // Both policies will decide to sample - mpe1.NextDecision = sampling.Sampled - mpe2.NextDecision = sampling.Sampled - tsp.samplingPolicyOnTick() - require.False( - t, - msp.SpansCount() == 0 || mpe1.EvaluationCount == 0 || mpe2.EvaluationCount == 0, - "policy should have been evaluated totalspans == %d and evaluationcount(1) == %d and evaluationcount(2) == %d", - msp.SpansCount(), - mpe1.EvaluationCount, - mpe2.EvaluationCount, - ) - - require.Equal(t, numSpansPerBatchWindow, msp.SpansCount(), "nextConsumer should've been called with exactly 1 batch of spans") - - // Late span of a sampled trace should be sent directly down the pipeline exporter - tsp.ConsumeTraces(context.Background(), batches[0]) - expectedNumWithLateSpan := numSpansPerBatchWindow + 1 - require.Equal(t, expectedNumWithLateSpan, msp.SpansCount(), "late span was not accounted for") - require.Equal(t, 1, mpe1.LateArrivingSpansCount, "1st policy was not notified of the late span") - require.Equal(t, 0, mpe2.LateArrivingSpansCount, "2nd policy should not have been notified of the late span") -} - -func TestSamplingPolicyDecisionNotSampled(t *testing.T) { - const maxSize = 100 - const decisionWaitSeconds = 5 - // For this test explicitly control the timer calls and batcher, and set a mock - // sampling policy evaluator. - msp := new(consumertest.TracesSink) - mpe := &mockPolicyEvaluator{} - mtt := &manualTTicker{} - tsp := &tailSamplingSpanProcessor{ - ctx: context.Background(), - nextConsumer: msp, - maxNumTraces: maxSize, - logger: zap.NewNop(), - decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), - policies: []*Policy{{Name: "mock-policy", Evaluator: mpe, ctx: context.TODO()}}, - deleteChan: make(chan traceKey, maxSize), - policyTicker: mtt, - } - - _, batches := generateIdsAndBatches(210) - currItem := 0 - numSpansPerBatchWindow := 10 - // First evaluations shouldn't have anything to evaluate, until decision wait time passed. - for evalNum := 0; evalNum < decisionWaitSeconds; evalNum++ { - for ; currItem < numSpansPerBatchWindow*(evalNum+1); currItem++ { - tsp.ConsumeTraces(context.Background(), batches[currItem]) - require.True(t, mtt.Started, "Time ticker was expected to have started") - } - tsp.samplingPolicyOnTick() - require.False( - t, - msp.SpansCount() != 0 || mpe.EvaluationCount != 0, - "policy for initial items was evaluated before decision wait period", - ) - } - - // Now the first batch that waited the decision period. - mpe.NextDecision = sampling.NotSampled - tsp.samplingPolicyOnTick() - require.EqualValues(t, 0, msp.SpansCount(), "exporter should have received zero spans") - require.EqualValues(t, 4, mpe.EvaluationCount, "policy should have been evaluated 4 times") - - // Late span of a non-sampled trace should be ignored - tsp.ConsumeTraces(context.Background(), batches[0]) - require.Equal(t, 0, msp.SpansCount()) - require.Equal(t, 1, mpe.LateArrivingSpansCount, "policy was not notified of the late span") - - mpe.NextDecision = sampling.Unspecified - mpe.NextError = errors.New("mock policy error") - tsp.samplingPolicyOnTick() - require.EqualValues(t, 0, msp.SpansCount(), "exporter should have received zero spans") - require.EqualValues(t, 6, mpe.EvaluationCount, "policy should have been evaluated 6 times") - - // Late span of a non-sampled trace should be ignored - tsp.ConsumeTraces(context.Background(), batches[0]) - require.Equal(t, 0, msp.SpansCount()) - require.Equal(t, 2, mpe.LateArrivingSpansCount, "policy was not notified of the late span") -} - -func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { - const maxSize = 100 - const decisionWaitSeconds = 1 - // For this test explicitly control the timer calls and batcher, and set a mock - // sampling policy evaluator. - msp := new(consumertest.TracesSink) - mpe := &mockPolicyEvaluator{} - mtt := &manualTTicker{} - tsp := &tailSamplingSpanProcessor{ - ctx: context.Background(), - nextConsumer: msp, - maxNumTraces: maxSize, - logger: zap.NewNop(), - decisionBatcher: newSyncIDBatcher(decisionWaitSeconds), - policies: []*Policy{{Name: "mock-policy", Evaluator: mpe, ctx: context.TODO()}}, - deleteChan: make(chan traceKey, maxSize), - policyTicker: mtt, - } - - mpe.NextDecision = sampling.Sampled - - traceIds, batches := generateIdsAndBatches(3) - for _, batch := range batches { - require.NoError(t, tsp.ConsumeTraces(context.Background(), batch)) - } - - tsp.samplingPolicyOnTick() - tsp.samplingPolicyOnTick() - - require.EqualValues(t, 3, len(msp.AllTraces()), "There should be three batches, one for each trace") - - expectedSpanIds := make(map[int][]pdata.SpanID) - expectedSpanIds[0] = []pdata.SpanID{ - pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(1))), - } - expectedSpanIds[1] = []pdata.SpanID{ - pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(2))), - pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(3))), - } - expectedSpanIds[2] = []pdata.SpanID{ - pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(4))), - pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(5))), - pdata.NewSpanID(tracetranslator.UInt64ToByteSpanID(uint64(6))), - } - - receivedTraces := msp.AllTraces() - for i, traceID := range traceIds { - trace := findTrace(receivedTraces, traceID) - require.NotNil(t, trace, "Trace was not received. TraceId %s", traceID.HexString()) - require.EqualValues(t, i+1, trace.SpanCount(), "The trace should have all of its spans in a single batch") - - expected := expectedSpanIds[i] - got := collectSpanIds(trace) - - // might have received out of order, sort for comparison - sort.Slice(got, func(i, j int) bool { - a := tracetranslator.BytesToInt64SpanID(got[i].Bytes()) - b := tracetranslator.BytesToInt64SpanID(got[j].Bytes()) - return a < b - }) - - require.EqualValues(t, expected, got) - } -} - -func collectSpanIds(trace *pdata.Traces) []pdata.SpanID { - spanIDs := make([]pdata.SpanID, 0) - - for i := 0; i < trace.ResourceSpans().Len(); i++ { - ilss := trace.ResourceSpans().At(i).InstrumentationLibrarySpans() - - for j := 0; j < ilss.Len(); j++ { - ils := ilss.At(j) - - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - spanIDs = append(spanIDs, span.SpanID()) - } - } - } - - return spanIDs -} - -func findTrace(a []pdata.Traces, traceID pdata.TraceID) *pdata.Traces { - for _, batch := range a { - id := batch.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).TraceID() - if traceID.Bytes() == id.Bytes() { - return &batch - } - } - return nil -} - -func generateIdsAndBatches(numIds int) ([]pdata.TraceID, []pdata.Traces) { - traceIds := make([]pdata.TraceID, numIds) - spanID := 0 - var tds []pdata.Traces - for i := 0; i < numIds; i++ { - traceIds[i] = tracetranslator.UInt64ToTraceID(1, uint64(i+1)) - // Send each span in a separate batch - for j := 0; j <= i; j++ { - td := testdata.GenerateTraceDataOneSpan() - span := td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0) - span.SetTraceID(traceIds[i]) - - spanID++ - span.SetSpanID(tracetranslator.UInt64ToSpanID(uint64(spanID))) - tds = append(tds, td) - } - } - - return traceIds, tds -} - -type mockPolicyEvaluator struct { - NextDecision sampling.Decision - NextError error - EvaluationCount int - LateArrivingSpansCount int - OnDroppedSpansCount int -} - -var _ sampling.PolicyEvaluator = (*mockPolicyEvaluator)(nil) - -func (m *mockPolicyEvaluator) OnLateArrivingSpans(sampling.Decision, []*pdata.Span) error { - m.LateArrivingSpansCount++ - return m.NextError -} -func (m *mockPolicyEvaluator) Evaluate(pdata.TraceID, *sampling.TraceData) (sampling.Decision, error) { - m.EvaluationCount++ - return m.NextDecision, m.NextError -} -func (m *mockPolicyEvaluator) OnDroppedSpans(pdata.TraceID, *sampling.TraceData) (sampling.Decision, error) { - m.OnDroppedSpansCount++ - return m.NextDecision, m.NextError -} - -type manualTTicker struct { - Started bool -} - -var _ tTicker = (*manualTTicker)(nil) - -func (t *manualTTicker) Start(time.Duration) { - t.Started = true -} - -func (t *manualTTicker) OnTick() { -} - -func (t *manualTTicker) Stop() { -} - -type syncIDBatcher struct { - sync.Mutex - openBatch idbatcher.Batch - batchPipe chan idbatcher.Batch -} - -var _ idbatcher.Batcher = (*syncIDBatcher)(nil) - -func newSyncIDBatcher(numBatches uint64) idbatcher.Batcher { - batches := make(chan idbatcher.Batch, numBatches) - for i := uint64(0); i < numBatches; i++ { - batches <- nil - } - return &syncIDBatcher{ - batchPipe: batches, - } -} - -func (s *syncIDBatcher) AddToCurrentBatch(id pdata.TraceID) { - s.Lock() - s.openBatch = append(s.openBatch, id) - s.Unlock() -} - -func (s *syncIDBatcher) CloseCurrentAndTakeFirstBatch() (idbatcher.Batch, bool) { - s.Lock() - defer s.Unlock() - firstBatch := <-s.batchPipe - s.batchPipe <- s.openBatch - s.openBatch = nil - return firstBatch, true -} - -func (s *syncIDBatcher) Stop() { -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample.go deleted file mode 100644 index 8ade89532a0..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -type alwaysSample struct { - logger *zap.Logger -} - -var _ PolicyEvaluator = (*alwaysSample)(nil) - -// NewAlwaysSample creates a policy evaluator the samples all traces. -func NewAlwaysSample(logger *zap.Logger) PolicyEvaluator { - return &alwaysSample{ - logger: logger, - } -} - -// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived -// after the sampling decision was already taken for the trace. -// This gives the evaluator a chance to log any message/metrics and/or update any -// related internal state. -func (as *alwaysSample) OnLateArrivingSpans(Decision, []*pdata.Span) error { - as.logger.Debug("Triggering action for late arriving spans in always-sample filter") - return nil -} - -// Evaluate looks at the trace data and returns a corresponding SamplingDecision. -func (as *alwaysSample) Evaluate(pdata.TraceID, *TraceData) (Decision, error) { - as.logger.Debug("Evaluating spans in always-sample filter") - return Sampled, nil -} - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (as *alwaysSample) OnDroppedSpans(pdata.TraceID, *TraceData) (Decision, error) { - as.logger.Debug("Triggering action for dropped spans in always-sample filter") - return Sampled, nil -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample_test.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample_test.go deleted file mode 100644 index dc464facebd..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/always_sample_test.go +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "math" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -func TestEvaluate_AlwaysSample(t *testing.T) { - filter := NewAlwaysSample(zap.NewNop()) - decision, err := filter.Evaluate(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}), newTraceStringAttrs(map[string]pdata.AttributeValue{}, "example", "value")) - assert.Nil(t, err) - assert.Equal(t, decision, Sampled) -} - -func TestOnDroppedSpans_AlwaysSample(t *testing.T) { - var empty = map[string]pdata.AttributeValue{} - u, _ := uuid.NewRandom() - filter := NewAlwaysSample(zap.NewNop()) - decision, err := filter.OnDroppedSpans(pdata.NewTraceID(u), newTraceIntAttrs(empty, "example", math.MaxInt32+1)) - assert.Nil(t, err) - assert.Equal(t, decision, Sampled) -} - -func TestOnLateArrivingSpans_AlwaysSample(t *testing.T) { - filter := NewAlwaysSample(zap.NewNop()) - err := filter.OnLateArrivingSpans(NotSampled, nil) - assert.Nil(t, err) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/doc.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/doc.go deleted file mode 100644 index c41ee257788..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/doc.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Package sampling contains the interfaces and data types used to implement -// the various sampling policies. -package sampling diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/empty_test.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/empty_test.go deleted file mode 100644 index 28080c37f33..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/empty_test.go +++ /dev/null @@ -1,15 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter.go deleted file mode 100644 index 8fc804956d0..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter.go +++ /dev/null @@ -1,92 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -type numericAttributeFilter struct { - key string - minValue, maxValue int64 - logger *zap.Logger -} - -var _ PolicyEvaluator = (*numericAttributeFilter)(nil) - -// NewNumericAttributeFilter creates a policy evaluator that samples all traces with -// the given attribute in the given numeric range. -func NewNumericAttributeFilter(logger *zap.Logger, key string, minValue, maxValue int64) PolicyEvaluator { - return &numericAttributeFilter{ - key: key, - minValue: minValue, - maxValue: maxValue, - logger: logger, - } -} - -// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived -// after the sampling decision was already taken for the trace. -// This gives the evaluator a chance to log any message/metrics and/or update any -// related internal state. -func (naf *numericAttributeFilter) OnLateArrivingSpans(Decision, []*pdata.Span) error { - naf.logger.Debug("Triggering action for late arriving spans in numeric-attribute filter") - return nil -} - -// Evaluate looks at the trace data and returns a corresponding SamplingDecision. -func (naf *numericAttributeFilter) Evaluate(_ pdata.TraceID, trace *TraceData) (Decision, error) { - trace.Lock() - batches := trace.ReceivedBatches - trace.Unlock() - for _, batch := range batches { - rspans := batch.ResourceSpans() - for i := 0; i < rspans.Len(); i++ { - rs := rspans.At(i) - if rs.IsNil() { - continue - } - ilss := rs.InstrumentationLibrarySpans() - for j := 0; j < ilss.Len(); j++ { - ils := ilss.At(j) - if ils.IsNil() { - continue - } - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - if span.IsNil() { - continue - } - if v, ok := span.Attributes().Get(naf.key); ok { - value := v.IntVal() - if value >= naf.minValue && value <= naf.maxValue { - return Sampled, nil - } - } - } - } - } - } - return NotSampled, nil -} - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (naf *numericAttributeFilter) OnDroppedSpans(pdata.TraceID, *TraceData) (Decision, error) { - naf.logger.Debug("Triggering action for dropped spans in numeric-attribute filter") - return NotSampled, nil -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter_test.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter_test.go deleted file mode 100644 index 08a8e34f1a7..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/numeric_tag_filter_test.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "math" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -func TestNumericTagFilter(t *testing.T) { - - var empty = map[string]pdata.AttributeValue{} - filter := NewNumericAttributeFilter(zap.NewNop(), "example", math.MinInt32, math.MaxInt32) - - resAttr := map[string]pdata.AttributeValue{} - resAttr["example"] = pdata.NewAttributeValueInt(8) - - cases := []struct { - Desc string - Trace *TraceData - Decision Decision - }{ - { - Desc: "nonmatching span attribute", - Trace: newTraceIntAttrs(empty, "non_matching", math.MinInt32), - Decision: NotSampled, - }, - { - Desc: "span attribute with lower limit", - Trace: newTraceIntAttrs(empty, "example", math.MinInt32), - Decision: Sampled, - }, - { - Desc: "span attribute with upper limit", - Trace: newTraceIntAttrs(empty, "example", math.MaxInt32), - Decision: Sampled, - }, - { - Desc: "span attribute below min limit", - Trace: newTraceIntAttrs(empty, "example", math.MinInt32-1), - Decision: NotSampled, - }, - { - Desc: "span attribute above max limit", - Trace: newTraceIntAttrs(empty, "example", math.MaxInt32+1), - Decision: NotSampled, - }, - } - - for _, c := range cases { - t.Run(c.Desc, func(t *testing.T) { - u, _ := uuid.NewRandom() - decision, err := filter.Evaluate(pdata.NewTraceID(u), c.Trace) - assert.NoError(t, err) - assert.Equal(t, decision, c.Decision) - }) - } -} - -func TestOnDroppedSpans_NumericTagFilter(t *testing.T) { - var empty = map[string]pdata.AttributeValue{} - u, _ := uuid.NewRandom() - filter := NewNumericAttributeFilter(zap.NewNop(), "example", math.MinInt32, math.MaxInt32) - decision, err := filter.OnDroppedSpans(pdata.NewTraceID(u), newTraceIntAttrs(empty, "example", math.MaxInt32+1)) - assert.Nil(t, err) - assert.Equal(t, decision, NotSampled) -} - -func TestOnLateArrivingSpans_NumericTagFilter(t *testing.T) { - filter := NewNumericAttributeFilter(zap.NewNop(), "example", math.MinInt32, math.MaxInt32) - err := filter.OnLateArrivingSpans(NotSampled, nil) - assert.Nil(t, err) -} - -func newTraceIntAttrs(nodeAttrs map[string]pdata.AttributeValue, spanAttrKey string, spanAttrValue int64) *TraceData { - var traceBatches []pdata.Traces - traces := pdata.NewTraces() - traces.ResourceSpans().Resize(1) - rs := traces.ResourceSpans().At(0) - rs.Resource().InitEmpty() - rs.Resource().Attributes().InitFromMap(nodeAttrs) - rs.InstrumentationLibrarySpans().Resize(1) - ils := rs.InstrumentationLibrarySpans().At(0) - ils.Spans().Resize(1) - span := ils.Spans().At(0) - span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) - span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) - attributes := make(map[string]pdata.AttributeValue) - attributes[spanAttrKey] = pdata.NewAttributeValueInt(spanAttrValue) - span.Attributes().InitFromMap(attributes) - traceBatches = append(traceBatches, traces) - return &TraceData{ - ReceivedBatches: traceBatches, - } -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/policy.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/policy.go deleted file mode 100644 index fc4b68e3c79..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/policy.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "sync" - "time" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -// TraceData stores the sampling related trace data. -type TraceData struct { - sync.Mutex - // Decisions gives the current status of the sampling decision for each policy. - Decisions []Decision - // Arrival time the first span for the trace was received. - ArrivalTime time.Time - // Decisiontime time when sampling decision was taken. - DecisionTime time.Time - // SpanCount track the number of spans on the trace. - SpanCount int64 - // ReceivedBatches stores all the batches received for the trace. - ReceivedBatches []pdata.Traces -} - -// Decision gives the status of sampling decision. -type Decision int32 - -const ( - // Unspecified indicates that the status of the decision was not set yet. - Unspecified Decision = iota - // Pending indicates that the policy was not evaluated yet. - Pending - // Sampled is used to indicate that the decision was already taken - // to sample the data. - Sampled - // NotSampled is used to indicate that the decision was already taken - // to not sample the data. - NotSampled - // Dropped is used when data needs to be purged before the sampling policy - // had a chance to evaluate it. - Dropped -) - -// PolicyEvaluator implements a tail-based sampling policy evaluator, -// which makes a sampling decision for a given trace when requested. -type PolicyEvaluator interface { - // OnLateArrivingSpans notifies the evaluator that the given list of spans arrived - // after the sampling decision was already taken for the trace. - // This gives the evaluator a chance to log any message/metrics and/or update any - // related internal state. - OnLateArrivingSpans(earlyDecision Decision, spans []*pdata.Span) error - - // Evaluate looks at the trace data and returns a corresponding SamplingDecision. - Evaluate(traceID pdata.TraceID, trace *TraceData) (Decision, error) - - // OnDroppedSpans is called when the trace needs to be dropped, due to memory - // pressure, before the decision_wait time has been reached. - OnDroppedSpans(traceID pdata.TraceID, trace *TraceData) (Decision, error) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting.go deleted file mode 100644 index b4d2483140b..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "time" - - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -type rateLimiting struct { - currentSecond int64 - spansInCurrentSecond int64 - spansPerSecond int64 - logger *zap.Logger -} - -var _ PolicyEvaluator = (*rateLimiting)(nil) - -// NewRateLimiting creates a policy evaluator the samples all traces. -func NewRateLimiting(logger *zap.Logger, spansPerSecond int64) PolicyEvaluator { - return &rateLimiting{ - spansPerSecond: spansPerSecond, - logger: logger, - } -} - -// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived -// after the sampling decision was already taken for the trace. -// This gives the evaluator a chance to log any message/metrics and/or update any -// related internal state. -func (r *rateLimiting) OnLateArrivingSpans(Decision, []*pdata.Span) error { - r.logger.Debug("Triggering action for late arriving spans in rate-limiting filter") - return nil -} - -// Evaluate looks at the trace data and returns a corresponding SamplingDecision. -func (r *rateLimiting) Evaluate(_ pdata.TraceID, trace *TraceData) (Decision, error) { - r.logger.Debug("Evaluating spans in rate-limiting filter") - currSecond := time.Now().Unix() - if r.currentSecond != currSecond { - r.currentSecond = currSecond - r.spansInCurrentSecond = 0 - } - - spansInSecondIfSampled := r.spansInCurrentSecond + trace.SpanCount - if spansInSecondIfSampled < r.spansPerSecond { - r.spansInCurrentSecond = spansInSecondIfSampled - return Sampled, nil - } - - return NotSampled, nil -} - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (r *rateLimiting) OnDroppedSpans(pdata.TraceID, *TraceData) (Decision, error) { - r.logger.Debug("Triggering action for dropped spans in rate-limiting filter") - return Sampled, nil -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting_test.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting_test.go deleted file mode 100644 index 85db03c9eef..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/rate_limiting_test.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "math" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -func TestRateLimiter(t *testing.T) { - var empty = map[string]pdata.AttributeValue{} - - trace := newTraceStringAttrs(empty, "example", "value") - traceID := pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}) - rateLimiter := NewRateLimiting(zap.NewNop(), 3) - - // Trace span count greater than spans per second - trace.SpanCount = 10 - decision, err := rateLimiter.Evaluate(traceID, trace) - assert.Nil(t, err) - assert.Equal(t, decision, NotSampled) - - // Trace span count equal to spans per second - trace.SpanCount = 3 - decision, err = rateLimiter.Evaluate(traceID, trace) - assert.Nil(t, err) - assert.Equal(t, decision, NotSampled) - - // Trace span count less than spans per second - trace.SpanCount = 2 - decision, err = rateLimiter.Evaluate(traceID, trace) - assert.Nil(t, err) - assert.Equal(t, decision, Sampled) - - // Trace span count less than spans per second - trace.SpanCount = 0 - decision, err = rateLimiter.Evaluate(traceID, trace) - assert.Nil(t, err) - assert.Equal(t, decision, Sampled) -} - -func TestOnDroppedSpans_RateLimiter(t *testing.T) { - var empty = map[string]pdata.AttributeValue{} - u, _ := uuid.NewRandom() - rateLimiter := NewRateLimiting(zap.NewNop(), 3) - decision, err := rateLimiter.OnDroppedSpans(pdata.NewTraceID(u), newTraceIntAttrs(empty, "example", math.MaxInt32+1)) - assert.Nil(t, err) - assert.Equal(t, decision, Sampled) -} - -func TestOnLateArrivingSpans_RateLimiter(t *testing.T) { - rateLimiter := NewRateLimiting(zap.NewNop(), 3) - err := rateLimiter.OnLateArrivingSpans(NotSampled, nil) - assert.Nil(t, err) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter.go deleted file mode 100644 index 27e16e144d7..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -type stringAttributeFilter struct { - key string - values map[string]struct{} - logger *zap.Logger -} - -var _ PolicyEvaluator = (*stringAttributeFilter)(nil) - -// NewStringAttributeFilter creates a policy evaluator that samples all traces with -// the given attribute in the given numeric range. -func NewStringAttributeFilter(logger *zap.Logger, key string, values []string) PolicyEvaluator { - valuesMap := make(map[string]struct{}) - for _, value := range values { - if value != "" { - valuesMap[value] = struct{}{} - } - } - return &stringAttributeFilter{ - key: key, - values: valuesMap, - logger: logger, - } -} - -// OnLateArrivingSpans notifies the evaluator that the given list of spans arrived -// after the sampling decision was already taken for the trace. -// This gives the evaluator a chance to log any message/metrics and/or update any -// related internal state. -func (saf *stringAttributeFilter) OnLateArrivingSpans(Decision, []*pdata.Span) error { - saf.logger.Debug("Triggering action for late arriving spans in string-tag filter") - return nil -} - -// Evaluate looks at the trace data and returns a corresponding SamplingDecision. -func (saf *stringAttributeFilter) Evaluate(_ pdata.TraceID, trace *TraceData) (Decision, error) { - saf.logger.Debug("Evaluting spans in string-tag filter") - trace.Lock() - batches := trace.ReceivedBatches - trace.Unlock() - for _, batch := range batches { - rspans := batch.ResourceSpans() - - for i := 0; i < rspans.Len(); i++ { - rs := rspans.At(i) - if rs.IsNil() { - continue - } - - resource := rs.Resource() - if !resource.IsNil() { - if v, ok := resource.Attributes().Get(saf.key); ok { - if _, ok := saf.values[v.StringVal()]; ok { - return Sampled, nil - } - } - } - - ilss := rs.InstrumentationLibrarySpans() - for j := 0; j < ilss.Len(); j++ { - ils := ilss.At(j) - if ils.IsNil() { - continue - } - for k := 0; k < ils.Spans().Len(); k++ { - span := ils.Spans().At(k) - if span.IsNil() { - continue - } - if v, ok := span.Attributes().Get(saf.key); ok { - truncableStr := v.StringVal() - if len(truncableStr) > 0 { - if _, ok := saf.values[truncableStr]; ok { - return Sampled, nil - } - } - } - - } - } - } - } - return NotSampled, nil -} - -// OnDroppedSpans is called when the trace needs to be dropped, due to memory -// pressure, before the decision_wait time has been reached. -func (saf *stringAttributeFilter) OnDroppedSpans(pdata.TraceID, *TraceData) (Decision, error) { - saf.logger.Debug("Triggering action for dropped spans in string-tag filter") - return NotSampled, nil -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter_test.go b/processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter_test.go deleted file mode 100644 index 3c56b2280e4..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/sampling/string_tag_filter_test.go +++ /dev/null @@ -1,114 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sampling - -import ( - "math" - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/consumer/pdata" -) - -func TestStringTagFilter(t *testing.T) { - - var empty = map[string]pdata.AttributeValue{} - filter := NewStringAttributeFilter(zap.NewNop(), "example", []string{"value"}) - - cases := []struct { - Desc string - Trace *TraceData - Decision Decision - }{ - { - Desc: "nonmatching node attribute key", - Trace: newTraceStringAttrs(map[string]pdata.AttributeValue{"non_matching": pdata.NewAttributeValueString("value")}, "", ""), - Decision: NotSampled, - }, - { - Desc: "nonmatching node attribute value", - Trace: newTraceStringAttrs(map[string]pdata.AttributeValue{"example": pdata.NewAttributeValueString("non_matching")}, "", ""), - Decision: NotSampled, - }, - { - Desc: "matching node attribute", - Trace: newTraceStringAttrs(map[string]pdata.AttributeValue{"example": pdata.NewAttributeValueString("value")}, "", ""), - Decision: Sampled, - }, - { - Desc: "nonmatching span attribute key", - Trace: newTraceStringAttrs(empty, "nonmatching", "value"), - Decision: NotSampled, - }, - { - Desc: "nonmatching span attribute value", - Trace: newTraceStringAttrs(empty, "example", "nonmatching"), - Decision: NotSampled, - }, - { - Desc: "matching span attribute", - Trace: newTraceStringAttrs(empty, "example", "value"), - Decision: Sampled, - }, - } - - for _, c := range cases { - t.Run(c.Desc, func(t *testing.T) { - decision, err := filter.Evaluate(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}), c.Trace) - assert.NoError(t, err) - assert.Equal(t, decision, c.Decision) - }) - } -} - -func newTraceStringAttrs(nodeAttrs map[string]pdata.AttributeValue, spanAttrKey string, spanAttrValue string) *TraceData { - var traceBatches []pdata.Traces - traces := pdata.NewTraces() - traces.ResourceSpans().Resize(1) - rs := traces.ResourceSpans().At(0) - rs.Resource().InitEmpty() - rs.Resource().Attributes().InitFromMap(nodeAttrs) - rs.InstrumentationLibrarySpans().Resize(1) - ils := rs.InstrumentationLibrarySpans().At(0) - ils.Spans().Resize(1) - span := ils.Spans().At(0) - span.SetTraceID(pdata.NewTraceID([16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16})) - span.SetSpanID(pdata.NewSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) - attributes := make(map[string]pdata.AttributeValue) - attributes[spanAttrKey] = pdata.NewAttributeValueString(spanAttrValue) - span.Attributes().InitFromMap(attributes) - traceBatches = append(traceBatches, traces) - return &TraceData{ - ReceivedBatches: traceBatches, - } -} - -func TestOnDroppedSpans_StringAttribute(t *testing.T) { - var empty = map[string]pdata.AttributeValue{} - u, _ := uuid.NewRandom() - filter := NewStringAttributeFilter(zap.NewNop(), "example", []string{"value"}) - decision, err := filter.OnDroppedSpans(pdata.NewTraceID(u), newTraceIntAttrs(empty, "example", math.MaxInt32+1)) - assert.Nil(t, err) - assert.Equal(t, decision, NotSampled) -} - -func TestOnLateArrivingSpans_StringAttribute(t *testing.T) { - filter := NewStringAttributeFilter(zap.NewNop(), "example", []string{"value"}) - err := filter.OnLateArrivingSpans(NotSampled, nil) - assert.Nil(t, err) -} diff --git a/processor/samplingprocessor/tailsamplingprocessor/testdata/tail_sampling_config.yaml b/processor/samplingprocessor/tailsamplingprocessor/testdata/tail_sampling_config.yaml deleted file mode 100644 index fa5963b938f..00000000000 --- a/processor/samplingprocessor/tailsamplingprocessor/testdata/tail_sampling_config.yaml +++ /dev/null @@ -1,40 +0,0 @@ -receivers: - examplereceiver: - -exporters: - exampleexporter: - -processors: - tail_sampling: - decision_wait: 10s - num_traces: 100 - expected_new_traces_per_sec: 10 - policies: - [ - { - name: test-policy-1, - type: always_sample - }, - { - name: test-policy-2, - type: numeric_attribute, - numeric_attribute: {key: key1, min_value: 50, max_value: 100} - }, - { - name: test-policy-3, - type: string_attribute, - string_attribute: {key: key2, values: [value1, value2]} - }, - { - name: test-policy-4, - type: rate_limiting, - rate_limiting: {spans_per_second: 35} - }, - ] - -service: - pipelines: - traces: - receivers: [examplereceiver] - processors: [tail_sampling] - exporters: [exampleexporter] diff --git a/service/defaultcomponents/defaults.go b/service/defaultcomponents/defaults.go index a47af8926d3..61ba1204d30 100644 --- a/service/defaultcomponents/defaults.go +++ b/service/defaultcomponents/defaults.go @@ -39,7 +39,6 @@ import ( "go.opentelemetry.io/collector/processor/queuedprocessor" "go.opentelemetry.io/collector/processor/resourceprocessor" "go.opentelemetry.io/collector/processor/samplingprocessor/probabilisticsamplerprocessor" - "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor" "go.opentelemetry.io/collector/processor/spanprocessor" "go.opentelemetry.io/collector/receiver/fluentforwardreceiver" "go.opentelemetry.io/collector/receiver/hostmetricsreceiver" @@ -105,7 +104,6 @@ func Components() ( queuedprocessor.NewFactory(), batchprocessor.NewFactory(), memorylimiter.NewFactory(), - tailsamplingprocessor.NewFactory(), probabilisticsamplerprocessor.NewFactory(), spanprocessor.NewFactory(), filterprocessor.NewFactory(), diff --git a/service/defaultcomponents/defaults_test.go b/service/defaultcomponents/defaults_test.go index 865e677a0fe..8284a07ad5f 100644 --- a/service/defaultcomponents/defaults_test.go +++ b/service/defaultcomponents/defaults_test.go @@ -48,7 +48,6 @@ func TestDefaultComponents(t *testing.T) { "queued_retry", "batch", "memory_limiter", - "tail_sampling", "probabilistic_sampler", "span", "filter", diff --git a/service/telemetry.go b/service/telemetry.go index a073dd803e4..9a03140fabd 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/batchprocessor" "go.opentelemetry.io/collector/processor/queuedprocessor" - "go.opentelemetry.io/collector/processor/samplingprocessor/tailsamplingprocessor" fluentobserv "go.opentelemetry.io/collector/receiver/fluentforwardreceiver/observ" "go.opentelemetry.io/collector/receiver/kafkareceiver" telemetry2 "go.opentelemetry.io/collector/service/internal/telemetry" @@ -73,7 +72,6 @@ func (tel *appTelemetry) init(asyncErrorChannel chan<- error, ballastSizeBytes u views = append(views, processor.MetricViews(level)...) views = append(views, queuedprocessor.MetricViews(level)...) views = append(views, batchprocessor.MetricViews(level)...) - views = append(views, tailsamplingprocessor.SamplingProcessorMetricViews(level)...) views = append(views, kafkareceiver.MetricViews()...) views = append(views, processMetricsViews.Views()...) views = append(views, fluentobserv.Views(level)...)