diff --git a/.chloggen/probabilisticsampler_failclosed.yaml b/.chloggen/probabilisticsampler_failclosed.yaml new file mode 100644 index 000000000000..93304bc9fc5d --- /dev/null +++ b/.chloggen/probabilisticsampler_failclosed.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: probabilisticsamplerprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds the `FailClosed` flag to solidify current behavior when randomness source is missing. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31918] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/cmd/configschema/go.mod b/cmd/configschema/go.mod index 39722e6aeec8..7de75922e902 100644 --- a/cmd/configschema/go.mod +++ b/cmd/configschema/go.mod @@ -566,6 +566,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/winperfcounters v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.100.0 // indirect @@ -1226,3 +1227,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/connector/graf replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/sumologicextension => ../../extension/sumologicextension replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver => ../../receiver/splunkenterprisereceiver + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/cmd/otelcontribcol/builder-config.yaml b/cmd/otelcontribcol/builder-config.yaml index 8216bb082efc..3da29c41862c 100644 --- a/cmd/otelcontribcol/builder-config.yaml +++ b/cmd/otelcontribcol/builder-config.yaml @@ -472,3 +472,4 @@ replaces: - github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages - github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider => ../../confmap/provider/s3provider - github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider + - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/cmd/otelcontribcol/go.mod b/cmd/otelcontribcol/go.mod index 50e3327874ff..d470d50caddc 100644 --- a/cmd/otelcontribcol/go.mod +++ b/cmd/otelcontribcol/go.mod @@ -631,6 +631,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.100.0 // indirect @@ -1293,3 +1294,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opam replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/s3provider => ../../confmap/provider/s3provider replace github.com/open-telemetry/opentelemetry-collector-contrib/confmap/provider/secretsmanagerprovider => ../../confmap/provider/secretsmanagerprovider + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/connector/datadogconnector/go.mod b/connector/datadogconnector/go.mod index 4e35d31131a3..2a01807f21b9 100644 --- a/connector/datadogconnector/go.mod +++ b/connector/datadogconnector/go.mod @@ -332,3 +332,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/stor replace github.com/openshift/api v3.9.0+incompatible => github.com/openshift/api v0.0.0-20180801171038-322a19404e37 replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/exporter/datadogexporter/go.mod b/exporter/datadogexporter/go.mod index edc582019052..554d18300de5 100644 --- a/exporter/datadogexporter/go.mod +++ b/exporter/datadogexporter/go.mod @@ -250,6 +250,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.100.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -427,3 +428,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/tail replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage => ../../extension/storage replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../processor/transformprocessor + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/exporter/datadogexporter/integrationtest/go.mod b/exporter/datadogexporter/integrationtest/go.mod index 7db3ca3c159d..eab8f4cd3020 100644 --- a/exporter/datadogexporter/integrationtest/go.mod +++ b/exporter/datadogexporter/integrationtest/go.mod @@ -341,3 +341,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/prob replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver => ../../../receiver/prometheusreceiver replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor => ../../../processor/transformprocessor + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../../pkg/sampling diff --git a/go.mod b/go.mod index efc66a3b0020..35d4f487e471 100644 --- a/go.mod +++ b/go.mod @@ -578,6 +578,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.100.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.100.0 // indirect @@ -1226,3 +1227,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/enco replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/ackextension => ./extension/ackextension replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver => ./receiver/splunkenterprisereceiver + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ./pkg/sampling diff --git a/processor/probabilisticsamplerprocessor/README.md b/processor/probabilisticsamplerprocessor/README.md index ae015aae08b1..758d6ebf518c 100644 --- a/processor/probabilisticsamplerprocessor/README.md +++ b/processor/probabilisticsamplerprocessor/README.md @@ -15,51 +15,159 @@ [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib -The probabilistic sampler supports two types of sampling for traces: - -1. `sampling.priority` [semantic -convention](https://github.com/opentracing/specification/blob/master/semantic_conventions.md#span-tags-table) -as defined by OpenTracing -1. Trace ID hashing - -The `sampling.priority` semantic convention takes priority over trace ID hashing. As the name -implies, trace ID hashing samples based on hash values determined by trace IDs. See [Hashing](#hashing) for more information. +The probabilistic sampler processor supports several modes of sampling +for spans and log records. Sampling is performed on a per-request +basis, considering individual items statelessly. For whole trace +sampling, see +[tailsamplingprocessor](../tailsamplingprocessor/README.md). + +For trace spans, this sampler supports probabilistic sampling based on +a configured sampling percentage applied to the TraceID. In addition, +the sampler recognizes a `sampling.priority` annotation, which can +force the sampler to apply 0% or 100% sampling. + +For log records, this sampler can be configured to use the embedded +TraceID and follow the same logic as applied to spans. When the +TraceID is not defined, the sampler can be configured to apply hashing +to a selected log record attribute. This sampler also supports +sampling priority. + +## Consistency guarantee + +A consistent probability sampler is a Sampler that supports +independent sampling decisions for each span or log record in a group +(e.g. by TraceID), while maximizing the potential for completeness as +follows. + +Consistent probability sampling requires that for any span in a given +trace, if a Sampler with lesser sampling probability selects the span +for sampling, then the span would also be selected by a Sampler +configured with greater sampling probability. + +## Completeness property + +A trace is complete when all of its members are sampled. A +"sub-trace" is complete when all of its descendents are sampled. + +Ordinarily, Trace and Logging SDKs configure parent-based samplers +which decide to sample based on the Context, because it leads to +completeness. + +When non-root spans or logs make independent sampling decisions +instead of using the parent-based approach (e.g., using the +`TraceIDRatioBased` sampler for a non-root span), incompleteness may +result, and when spans and log records are independently sampled in a +processor, as by this component, the same potential for completeness +arises. The consistency guarantee helps minimimize this issue. + +Consistent probability samplers can be safely used with a mixture of +probabilities and preserve sub-trace completeness, provided that child +spans and log records are sampled with probability greater than or +equal to the parent context. + +Using 1%, 10% and 50% probabilities for example, in a consistent +probability scheme the 50% sampler must sample when the 10% sampler +does, and the 10% sampler must sample when the 1% sampler does. A +three-tier system could be configured with 1% sampling in the first +tier, 10% sampling in the second tier, and 50% sampling in the bottom +tier. In this configuration, 1% of traces will be complete, 10% of +traces will be sub-trace complete at the second tier, and 50% of +traces will be sub-trace complete at the third tier thanks to the +consistency property. + +These guidelines should be considered when deploying multiple +collectors with different sampling probabilities in a system. For +example, a collector serving frontend servers can be configured with +smaller sampling probability than a collector serving backend servers, +without breaking sub-trace completeness. + +## Sampling randomness + +To achieve consistency, sampling randomness is taken from a +deterministic aspect of the input data. For traces pipelines, the +source of randomness is always the TraceID. For logs pipelines, the +source of randomness can be the TraceID or another log record +attribute, if configured. + +For log records, the `attribute_source` and `from_attribute` fields determine the +source of randomness used for log records. When `attribute_source` is +set to `traceID`, the TraceID will be used. When `attribute_source` +is set to `record` or the TraceID field is absent, the value of +`from_attribute` is taken as the source of randomness (if configured). + +## Sampling priority + +The sampling priority mechanism is an override, which takes precedence +over the probabilistic decision in all modes. + +🛑 Compatibility note: Logs and Traces have different behavior. + +In traces pipelines, when the priority attribute has value 0, the +configured probability will by modified to 0% and the item will not +pass the sampler. When the priority attribute is non-zero the +configured probability will be set to 100%. The sampling priority +attribute is not configurable, and is called `sampling.priority`. + +In logs pipelines, when the priority attribute has value 0, the +configured probability will by modified to 0%, and the item will not +pass the sampler. Otherwise, the logs sampling priority attribute is +interpreted as a percentage, with values >= 100 equal to 100% +sampling. The logs sampling priority attribute is configured via +`sampling_priority`. + +## Sampling algorithm + +### Hash seed + +The hash seed method uses the FNV hash function applied to either a +Trace ID (spans, log records), or to the value of a specified +attribute (only logs). The hashed value, presumed to be random, is +compared against a threshold value that corresponds with the sampling +percentage. + +This mode requires configuring the `hash_seed` field. This mode is +enabled when the `hash_seed` field is not zero, or when log records +are sampled with `attribute_source` is set to `record`. + +In order for hashing to be consistent, all collectors for a given tier +(e.g. behind the same load balancer) must have the same +`hash_seed`. It is also possible to leverage a different `hash_seed` +at different collector tiers to support additional sampling +requirements. + +This mode uses 14 bits of sampling precision. + +### Error handling + +This processor considers it an error when the arriving data has no +randomess. This includes conditions where the TraceID field is +invalid (16 zero bytes) and where the log record attribute source has +zero bytes of information. + +By default, when there are errors determining sampling-related +information from an item of telemetry, the data will be refused. This +behavior can be changed by setting the `fail_closed` property to +false, in which case erroneous data will pass through the processor. + +## Configuration The following configuration options can be modified: -- `hash_seed` (no default): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. -- `sampling_percentage` (default = 0): Percentage at which traces are sampled; >= 100 samples all traces -Examples: +- `sampling_percentage` (32-bit floating point, required): Percentage at which items are sampled; >= 100 samples all items, 0 rejects all items. +- `hash_seed` (32-bit unsigned integer, optional, default = 0): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. +- `fail_closed` (boolean, optional, default = true): Whether to reject items with sampling-related errors. -```yaml -processors: - probabilistic_sampler: - hash_seed: 22 - sampling_percentage: 15.3 -``` +### Logs-specific configuration -The probabilistic sampler supports sampling logs according to their trace ID, or by a specific log record attribute. - -The probabilistic sampler optionally may use a `hash_seed` to compute the hash of a log record. -This sampler samples based on hash values determined by log records. See [Hashing](#hashing) for more information. - -The following configuration options can be modified: -- `hash_seed` (no default, optional): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. -- `sampling_percentage` (required): Percentage at which logs are sampled; >= 100 samples all logs, 0 rejects all logs. -- `attribute_source` (default = traceID, optional): defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. -- `from_attribute` (default = null, optional): The optional name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `attribute_source` is set to `record`. -- `sampling_priority` (default = null, optional): The optional name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record. - -## Hashing - -In order for hashing to work, all collectors for a given tier (e.g. behind the same load balancer) -must have the same `hash_seed`. It is also possible to leverage a different `hash_seed` at -different collector tiers to support additional sampling requirements. Please refer to -[config.go](./config.go) for the config spec. +- `attribute_source` (string, optional, default = "traceID"): defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. +- `from_attribute` (string, optional, default = ""): The name of a log record attribute used for sampling purposes, such as a unique log record ID. The value of the attribute is only used if the trace ID is absent or if `attribute_source` is set to `record`. +- `sampling_priority` (string, optional, default = ""): The name of a log record attribute used to set a different sampling priority from the `sampling_percentage` setting. 0 means to never sample the log record, and >= 100 means to always sample the log record. Examples: -Sample 15% of the logs: +Sample 15% of log records according to trace ID using the OpenTelemetry +specification. + ```yaml processors: probabilistic_sampler: @@ -76,7 +184,8 @@ processors: from_attribute: logID # value is required if the source is not traceID ``` -Sample logs according to the attribute `priority`: +Give sampling priority to log records according to the attribute named +`priority`: ```yaml processors: @@ -85,6 +194,7 @@ processors: sampling_priority: priority ``` +## Detailed examples -Refer to [config.yaml](./testdata/config.yaml) for detailed -examples on using the processor. +Refer to [config.yaml](./testdata/config.yaml) for detailed examples +on using the processor. diff --git a/processor/probabilisticsamplerprocessor/config.go b/processor/probabilisticsamplerprocessor/config.go index 3502fa8bc429..c4bc83eb6b11 100644 --- a/processor/probabilisticsamplerprocessor/config.go +++ b/processor/probabilisticsamplerprocessor/config.go @@ -35,6 +35,16 @@ type Config struct { // different sampling rates, configuring different seeds avoids that. HashSeed uint32 `mapstructure:"hash_seed"` + // FailClosed indicates to not sample data (the processor will + // fail "closed") in case of error, such as failure to parse + // the tracestate field or missing the randomness attribute. + // + // By default, failure cases are sampled (the processor is + // fails "open"). Sampling priority-based decisions are made after + // FailClosed is processed, making it possible to sample + // despite errors using priority. + FailClosed bool `mapstructure:"fail_closed"` + // AttributeSource (logs only) defines where to look for the attribute in from_attribute. The allowed values are // `traceID` or `record`. Default is `traceID`. AttributeSource `mapstructure:"attribute_source"` diff --git a/processor/probabilisticsamplerprocessor/config_test.go b/processor/probabilisticsamplerprocessor/config_test.go index 90711d343552..d236c519c53d 100644 --- a/processor/probabilisticsamplerprocessor/config_test.go +++ b/processor/probabilisticsamplerprocessor/config_test.go @@ -26,8 +26,8 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ SamplingPercentage: 15.3, - HashSeed: 22, AttributeSource: "traceID", + FailClosed: true, }, }, { @@ -38,6 +38,7 @@ func TestLoadConfig(t *testing.T) { AttributeSource: "record", FromAttribute: "foo", SamplingPriority: "bar", + FailClosed: true, }, }, } @@ -63,12 +64,21 @@ func TestLoadConfig(t *testing.T) { } func TestLoadInvalidConfig(t *testing.T) { - factories, err := otelcoltest.NopFactories() - require.NoError(t, err) + for _, test := range []struct { + file string + contains string + }{ + {"invalid_negative.yaml", "negative sampling rate"}, + } { + t.Run(test.file, func(t *testing.T) { + factories, err := otelcoltest.NopFactories() + require.NoError(t, err) - factory := NewFactory() - factories.Processors[metadata.Type] = factory + factory := NewFactory() + factories.Processors[metadata.Type] = factory - _, err = otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", "invalid.yaml"), factories) - require.ErrorContains(t, err, "negative sampling rate: -15.30") + _, err = otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", test.file), factories) + require.ErrorContains(t, err, test.contains) + }) + } } diff --git a/processor/probabilisticsamplerprocessor/factory.go b/processor/probabilisticsamplerprocessor/factory.go index 8302e7471840..481f37a600e0 100644 --- a/processor/probabilisticsamplerprocessor/factory.go +++ b/processor/probabilisticsamplerprocessor/factory.go @@ -20,6 +20,10 @@ import ( var onceMetrics sync.Once +// The default precision is 4 hex digits, slightly more the original +// component logic's 14-bits of precision. +const defaultPrecision = 4 + // NewFactory returns a new factory for the Probabilistic sampler processor. func NewFactory() processor.Factory { onceMetrics.Do(func() { @@ -37,6 +41,7 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ AttributeSource: defaultAttributeSource, + FailClosed: true, } } diff --git a/processor/probabilisticsamplerprocessor/factory_test.go b/processor/probabilisticsamplerprocessor/factory_test.go index cd4a6246c649..8818f49eb72d 100644 --- a/processor/probabilisticsamplerprocessor/factory_test.go +++ b/processor/probabilisticsamplerprocessor/factory_test.go @@ -15,22 +15,22 @@ import ( func TestCreateDefaultConfig(t *testing.T) { cfg := createDefaultConfig() - assert.NotNil(t, cfg, "failed to create default config") assert.NoError(t, componenttest.CheckConfigStruct(cfg)) + assert.NotNil(t, cfg, "failed to create default config") } func TestCreateProcessor(t *testing.T) { cfg := createDefaultConfig() set := processortest.NewNopCreateSettings() tp, err := createTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) - assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") + assert.NotNil(t, tp) } func TestCreateProcessorLogs(t *testing.T) { cfg := createDefaultConfig() set := processortest.NewNopCreateSettings() tp, err := createLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) - assert.NotNil(t, tp) assert.NoError(t, err, "cannot create logs processor") + assert.NotNil(t, tp) } diff --git a/processor/probabilisticsamplerprocessor/go.mod b/processor/probabilisticsamplerprocessor/go.mod index 3321681c9752..93346afb9561 100644 --- a/processor/probabilisticsamplerprocessor/go.mod +++ b/processor/probabilisticsamplerprocessor/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.100.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.100.0 github.com/stretchr/testify v1.9.0 go.opencensus.io v0.24.0 go.opentelemetry.io/collector/component v0.100.0 @@ -112,3 +113,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling => ../../pkg/sampling diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index d5f2ef3a75f8..d10ad88fa586 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -5,38 +5,79 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" - "strconv" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) -type logSamplerProcessor struct { - scaledSamplingRate uint32 - hashSeed uint32 - traceIDEnabled bool - samplingSource string - samplingPriority string - logger *zap.Logger +type logsProcessor struct { + sampler dataSampler + + samplingPriority string + failClosed bool + logger *zap.Logger +} + +type recordCarrier struct { + record plog.LogRecord +} + +var _ samplingCarrier = &recordCarrier{} + +func newLogRecordCarrier(l plog.LogRecord) samplingCarrier { + return &recordCarrier{ + record: l, + } +} + +func (*neverSampler) randomnessFromLogRecord(_ plog.LogRecord) (randomnessNamer, samplingCarrier, error) { + // We return a fake randomness value, since it will not be used. + // This avoids a consistency check error for missing randomness. + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil +} + +// randomnessFromLogRecord (hashingSampler) uses a hash function over +// the TraceID +func (th *hashingSampler) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { + rnd := newMissingRandomnessMethod() + lrc := newLogRecordCarrier(logRec) + + if th.logsTraceIDEnabled { + value := logRec.TraceID() + if !value.IsEmpty() { + rnd = newTraceIDHashingMethod(randomnessFromBytes(value[:], th.hashSeed)) + } + } + + if isMissing(rnd) && th.logsRandomnessSourceAttribute != "" { + if value, ok := logRec.Attributes().Get(th.logsRandomnessSourceAttribute); ok { + by := getBytesFromValue(value) + if len(by) > 0 { + rnd = newAttributeHashingMethod( + th.logsRandomnessSourceAttribute, + randomnessFromBytes(by, th.hashSeed), + ) + } + } + } + + return rnd, lrc, nil } // newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given // configuration. func newLogsProcessor(ctx context.Context, set processor.CreateSettings, nextConsumer consumer.Logs, cfg *Config) (processor.Logs, error) { - - lsp := &logSamplerProcessor{ - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, - traceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, - samplingPriority: cfg.SamplingPriority, - samplingSource: cfg.FromAttribute, - logger: set.Logger, + lsp := &logsProcessor{ + sampler: makeSampler(cfg), + samplingPriority: cfg.SamplingPriority, + failClosed: cfg.FailClosed, + logger: set.Logger, } return processorhelper.NewLogsProcessor( @@ -48,48 +89,20 @@ func newLogsProcessor(ctx context.Context, set processor.CreateSettings, nextCon processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) (plog.Logs, error) { - ld.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool { +func (lsp *logsProcessor) processLogs(ctx context.Context, logsData plog.Logs) (plog.Logs, error) { + logsData.ResourceLogs().RemoveIf(func(rl plog.ResourceLogs) bool { rl.ScopeLogs().RemoveIf(func(ill plog.ScopeLogs) bool { ill.LogRecords().RemoveIf(func(l plog.LogRecord) bool { - - tagPolicyValue := "always_sampling" - // pick the sampling source. - var lidBytes []byte - if lsp.traceIDEnabled && !l.TraceID().IsEmpty() { - value := l.TraceID() - tagPolicyValue = "trace_id_hash" - lidBytes = value[:] - } - if lidBytes == nil && lsp.samplingSource != "" { - if value, ok := l.Attributes().Get(lsp.samplingSource); ok { - tagPolicyValue = lsp.samplingSource - lidBytes = getBytesFromValue(value) - } - } - priority := lsp.scaledSamplingRate - if lsp.samplingPriority != "" { - if localPriority, ok := l.Attributes().Get(lsp.samplingPriority); ok { - switch localPriority.Type() { - case pcommon.ValueTypeDouble: - priority = uint32(localPriority.Double() * percentageScaleFactor) - case pcommon.ValueTypeInt: - priority = uint32(float64(localPriority.Int()) * percentageScaleFactor) - } - } - } - - sampled := computeHash(lidBytes, lsp.hashSeed)&bitMaskHashBuckets < priority - var err error = stats.RecordWithTags( + return !commonShouldSampleLogic( ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, tagPolicyValue), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, - statCountLogsSampled.M(int64(1)), + l, + lsp.sampler, + lsp.failClosed, + lsp.sampler.randomnessFromLogRecord, + lsp.priorityFunc, + "logs sampler", + lsp.logger, ) - if err != nil { - lsp.logger.Error(err.Error()) - } - - return !sampled }) // Filter out empty ScopeLogs return ill.LogRecords().Len() == 0 @@ -97,15 +110,46 @@ func (lsp *logSamplerProcessor) processLogs(ctx context.Context, ld plog.Logs) ( // Filter out empty ResourceLogs return rl.ScopeLogs().Len() == 0 }) - if ld.ResourceLogs().Len() == 0 { - return ld, processorhelper.ErrSkipProcessingData + if logsData.ResourceLogs().Len() == 0 { + return logsData, processorhelper.ErrSkipProcessingData + } + return logsData, nil +} + +func (lsp *logsProcessor) priorityFunc(logRec plog.LogRecord, rnd randomnessNamer, threshold sampling.Threshold) (randomnessNamer, sampling.Threshold) { + // Note: in logs, unlike traces, the sampling priority + // attribute is interpreted as a request to be sampled. + if lsp.samplingPriority != "" { + priorityThreshold := lsp.logRecordToPriorityThreshold(logRec) + + if priorityThreshold == sampling.NeverSampleThreshold { + threshold = priorityThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + } else if sampling.ThresholdLessThan(priorityThreshold, threshold) { + threshold = priorityThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + } } - return ld, nil + return rnd, threshold } -func getBytesFromValue(value pcommon.Value) []byte { - if value.Type() == pcommon.ValueTypeBytes { - return value.Bytes().AsRaw() +func (lsp *logsProcessor) logRecordToPriorityThreshold(logRec plog.LogRecord) sampling.Threshold { + if localPriority, ok := logRec.Attributes().Get(lsp.samplingPriority); ok { + // Potentially raise the sampling probability to minProb + minProb := 0.0 + switch localPriority.Type() { + case pcommon.ValueTypeDouble: + minProb = localPriority.Double() / 100.0 + case pcommon.ValueTypeInt: + minProb = float64(localPriority.Int()) / 100.0 + } + if minProb != 0 { + if th, err := sampling.ProbabilityToThresholdWithPrecision(minProb, defaultPrecision); err == nil { + // The record has supplied a valid alternative sampling probability + return th + } + + } } - return []byte(value.AsString()) + return sampling.NeverSampleThreshold } diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index 2e084ac38809..e80ab3c344fd 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -5,6 +5,7 @@ package probabilisticsamplerprocessor import ( "context" + "fmt" "testing" "time" @@ -15,6 +16,8 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/processor/processortest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) func TestNewLogsProcessor(t *testing.T) { @@ -55,6 +58,10 @@ func TestNewLogsProcessor(t *testing.T) { } func TestLogsSampling(t *testing.T) { + // Note: in the tests below, &Config{} objects are created w/o + // use of factory-supplied defaults. Therefore, we explicitly + // set FailClosed: true in cases where the legacy test + // included coverage of data with missing randomness. tests := []struct { name string cfg *Config @@ -79,7 +86,12 @@ func TestLogsSampling(t *testing.T) { cfg: &Config{ SamplingPercentage: 50, AttributeSource: traceIDAttributeSource, + FailClosed: true, }, + // Note: This count excludes one empty TraceID + // that fails closed. If this test had been + // written for 63% or greater, it would have been + // counted. received: 45, }, { @@ -89,6 +101,7 @@ func TestLogsSampling(t *testing.T) { AttributeSource: recordAttributeSource, FromAttribute: "foo", }, + received: 0, }, { @@ -106,6 +119,7 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "foo", + FailClosed: true, }, received: 23, }, @@ -115,6 +129,7 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "bar", + FailClosed: true, }, received: 29, // probabilistic... doesn't yield the same results as foo }, @@ -149,6 +164,8 @@ func TestLogsSampling(t *testing.T) { record.SetTimestamp(pcommon.Timestamp(time.Unix(1649400860, 0).Unix())) record.SetSeverityNumber(plog.SeverityNumberDebug) ib := byte(i) + // Note this TraceID is invalid when i==0. Since this test + // encodes historical behavior, we leave it as-is. traceID := [16]byte{0, 0, 0, 0, 0, 0, 0, 0, ib, ib, ib, ib, ib, ib, ib, ib} record.SetTraceID(traceID) // set half of records with a foo (bytes) and a bar (string) attribute @@ -173,3 +190,77 @@ func TestLogsSampling(t *testing.T) { }) } } + +func TestLogsMissingRandomness(t *testing.T) { + type test struct { + pct float32 + source AttributeSource + failClosed bool + sampled bool + } + + for _, tt := range []test{ + {0, recordAttributeSource, true, false}, + {50, recordAttributeSource, true, false}, + {100, recordAttributeSource, true, false}, + + {0, recordAttributeSource, false, false}, + {50, recordAttributeSource, false, true}, + {100, recordAttributeSource, false, true}, + + {0, traceIDAttributeSource, true, false}, + {50, traceIDAttributeSource, true, false}, + {100, traceIDAttributeSource, true, false}, + + {0, traceIDAttributeSource, false, false}, + {50, traceIDAttributeSource, false, true}, + {100, traceIDAttributeSource, false, true}, + } { + t.Run(fmt.Sprint(tt.pct, "_", tt.source, "_", tt.failClosed), func(t *testing.T) { + + ctx := context.Background() + logs := plog.NewLogs() + record := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + record.SetTraceID(pcommon.TraceID{}) // invalid TraceID + + cfg := &Config{ + SamplingPercentage: tt.pct, + HashSeed: defaultHashSeed, + FailClosed: tt.failClosed, + AttributeSource: tt.source, + FromAttribute: "unused", + } + + sink := new(consumertest.LogsSink) + set := processortest.NewNopCreateSettings() + // Note: there is a debug-level log we are expecting when FailClosed + // causes a drop. + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + lp, err := newLogsProcessor(ctx, set, sink, cfg) + require.NoError(t, err) + + err = lp.ConsumeLogs(ctx, logs) + require.NoError(t, err) + + sampledData := sink.AllLogs() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.LogRecordCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.LogRecordCount()) + } + + if tt.pct != 0 { + // pct==0 bypasses the randomness check + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "logs sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") + } else { + require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + } + }) + } +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode.go b/processor/probabilisticsamplerprocessor/sampler_mode.go new file mode 100644 index 000000000000..6bf09caa271f --- /dev/null +++ b/processor/probabilisticsamplerprocessor/sampler_mode.go @@ -0,0 +1,322 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package probabilisticsamplerprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor" + +import ( + "context" + "errors" + "fmt" + "strconv" + + "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" +) + +const ( + // Hashing method: The constants below help translate user friendly percentages + // to numbers direct used in sampling. + numHashBucketsLg2 = 14 + numHashBuckets = 0x4000 // Using a power of 2 to avoid division. + bitMaskHashBuckets = numHashBuckets - 1 + percentageScaleFactor = numHashBuckets / 100.0 +) + +// SamplerMode controls the logic used in making a sampling decision. +// The HashSeed mode is the only mode, presently, and it is also the +// default mode. +// +// TODO: In the future, when OTEP 235 is introduced, there will be two +// new modes. +type SamplerMode string + +const ( + HashSeed SamplerMode = "hash_seed" + DefaultMode SamplerMode = HashSeed + modeUnset SamplerMode = "" +) + +// ErrMissingRandomness indicates no randomness source was found. +var ErrMissingRandomness = errors.New("missing randomness") + +type randomnessNamer interface { + randomness() sampling.Randomness + policyName() string +} + +type randomnessMethod sampling.Randomness + +func (rm randomnessMethod) randomness() sampling.Randomness { + return sampling.Randomness(rm) +} + +type traceIDHashingMethod struct{ randomnessMethod } +type samplingPriorityMethod struct{ randomnessMethod } + +type missingRandomnessMethod struct{} + +func (rm missingRandomnessMethod) randomness() sampling.Randomness { + return sampling.AllProbabilitiesRandomness +} + +func (missingRandomnessMethod) policyName() string { + return "missing_randomness" +} + +type attributeHashingMethod struct { + randomnessMethod + attribute string +} + +func (am attributeHashingMethod) policyName() string { + return am.attribute +} + +func (traceIDHashingMethod) policyName() string { + return "trace_id_hash" +} + +func (samplingPriorityMethod) policyName() string { + return "sampling_priority" +} + +var _ randomnessNamer = missingRandomnessMethod{} +var _ randomnessNamer = traceIDHashingMethod{} +var _ randomnessNamer = samplingPriorityMethod{} + +func newMissingRandomnessMethod() randomnessNamer { + return missingRandomnessMethod{} +} + +func isMissing(rnd randomnessNamer) bool { + _, ok := rnd.(missingRandomnessMethod) + return ok +} + +func newTraceIDHashingMethod(rnd sampling.Randomness) randomnessNamer { + return traceIDHashingMethod{randomnessMethod(rnd)} +} + +func newSamplingPriorityMethod(rnd sampling.Randomness) randomnessNamer { + return samplingPriorityMethod{randomnessMethod(rnd)} +} + +func newAttributeHashingMethod(attribute string, rnd sampling.Randomness) randomnessNamer { + return attributeHashingMethod{ + randomnessMethod: randomnessMethod(rnd), + attribute: attribute, + } +} + +// TODO: Placeholder interface, see #31894 for its future contents, +// will become a non-empty interface. (Linter forces us to write "any".) +type samplingCarrier any + +type dataSampler interface { + // decide reports the result based on a probabilistic decision. + decide(carrier samplingCarrier) sampling.Threshold + + // randomnessFromSpan extracts randomness and returns a carrier specific to traces data. + randomnessFromSpan(s ptrace.Span) (randomness randomnessNamer, carrier samplingCarrier, err error) + + // randomnessFromLogRecord extracts randomness and returns a carrier specific to logs data. + randomnessFromLogRecord(s plog.LogRecord) (randomness randomnessNamer, carrier samplingCarrier, err error) +} + +var AllModes = []SamplerMode{HashSeed} + +func (sm *SamplerMode) UnmarshalText(in []byte) error { + switch mode := SamplerMode(in); mode { + case HashSeed, + modeUnset: + *sm = mode + return nil + default: + return fmt.Errorf("unsupported sampler mode %q", mode) + } +} + +// hashingSampler is the original hash-based calculation. It is an +// equalizing sampler with randomness calculation that matches the +// original implementation. This hash-based implementation is limited +// to 14 bits of precision. +type hashingSampler struct { + hashSeed uint32 + tvalueThreshold sampling.Threshold + + // Logs only: name of attribute to obtain randomness + logsRandomnessSourceAttribute string + + // Logs only: name of attribute to obtain randomness + logsTraceIDEnabled bool +} + +func (th *hashingSampler) decide(_ samplingCarrier) sampling.Threshold { + return th.tvalueThreshold +} + +// neverSampler always decides false. +type neverSampler struct { +} + +func (*neverSampler) decide(_ samplingCarrier) sampling.Threshold { + return sampling.NeverSampleThreshold +} + +func getBytesFromValue(value pcommon.Value) []byte { + if value.Type() == pcommon.ValueTypeBytes { + return value.Bytes().AsRaw() + } + return []byte(value.AsString()) +} + +func randomnessFromBytes(b []byte, hashSeed uint32) sampling.Randomness { + hashed32 := computeHash(b, hashSeed) + hashed := uint64(hashed32 & bitMaskHashBuckets) + + // Ordinarily, hashed is compared against an acceptance + // threshold i.e., sampled when hashed < scaledSamplerate, + // which has the form R < T with T in [1, 2^14] and + // R in [0, 2^14-1]. + // + // Here, modify R to R' and T to T', so that the sampling + // equation has identical form to the specification, i.e., T' + // <= R', using: + // + // T' = numHashBuckets-T + // R' = numHashBuckets-1-R + // + // As a result, R' has the correct most-significant 14 bits to + // use in an R-value. + rprime14 := numHashBuckets - 1 - hashed + + // There are 18 unused bits from the FNV hash function. + unused18 := uint64(hashed32 >> (32 - numHashBucketsLg2)) + mixed28 := unused18 ^ (unused18 << 10) + + // The 56 bit quantity here consists of, most- to least-significant: + // - 14 bits: R' = numHashBuckets - 1 - hashed + // - 28 bits: mixture of unused 18 bits + // - 14 bits: original `hashed`. + rnd56 := (rprime14 << 42) | (mixed28 << 14) | hashed + + // Note: by construction: + // - OTel samplers make the same probabilistic decision with this r-value, + // - only 14 out of 56 bits are used in the sampling decision, + // - there are only 32 actual random bits. + rnd, _ := sampling.UnsignedToRandomness(rnd56) + return rnd +} + +// consistencyCheck checks for certain inconsistent inputs. +// +// if the randomness is missing, returns ErrMissingRandomness. +func consistencyCheck(rnd randomnessNamer, _ samplingCarrier) error { + if isMissing(rnd) { + return ErrMissingRandomness + } + return nil +} + +// makeSample constructs a sampler. There are no errors, as the only +// potential error, out-of-range probability, is corrected automatically +// according to the README, which allows percents >100 to equal 100%. +// +// Extending this logic, we round very small probabilities up to the +// minimum supported value(s) which varies according to sampler mode. +func makeSampler(cfg *Config) dataSampler { + // README allows percents >100 to equal 100%. + pct := cfg.SamplingPercentage + if pct > 100 { + pct = 100 + } + + never := &neverSampler{} + + if pct == 0 { + return never + } + + // Note: the original hash function used in this code + // is preserved to ensure consistency across updates. + // + // uint32(pct * percentageScaleFactor) + // + // (a) carried out the multiplication in 32-bit precision + // (b) rounded to zero instead of nearest. + scaledSampleRate := uint32(pct * percentageScaleFactor) + + if scaledSampleRate == 0 { + return never + } + + // Convert the accept threshold to a reject threshold, + // then shift it into 56-bit value. + reject := numHashBuckets - scaledSampleRate + reject56 := uint64(reject) << 42 + + threshold, _ := sampling.UnsignedToThreshold(reject56) + + return &hashingSampler{ + tvalueThreshold: threshold, + hashSeed: cfg.HashSeed, + + // Logs specific: + logsTraceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, + logsRandomnessSourceAttribute: cfg.FromAttribute, + } +} + +// randFunc returns randomness (w/ named policy), a carrier, and the error. +type randFunc[T any] func(T) (randomnessNamer, samplingCarrier, error) + +// priorityFunc makes changes resulting from sampling priority. +type priorityFunc[T any] func(T, randomnessNamer, sampling.Threshold) (randomnessNamer, sampling.Threshold) + +// commonSamplingLogic implements sampling on a per-item basis +// independent of the signal type, as embodied in the functional +// parameters: +func commonShouldSampleLogic[T any]( + ctx context.Context, + item T, + sampler dataSampler, + failClosed bool, + randFunc randFunc[T], + priorityFunc priorityFunc[T], + description string, + logger *zap.Logger, +) bool { + rnd, carrier, err := randFunc(item) + if err == nil { + err = consistencyCheck(rnd, carrier) + } + var threshold sampling.Threshold + if err != nil { + logger.Debug(description, zap.Error(err)) + if failClosed { + threshold = sampling.NeverSampleThreshold + } else { + threshold = sampling.AlwaysSampleThreshold + } + } else { + threshold = sampler.decide(carrier) + } + + rnd, threshold = priorityFunc(item, rnd, threshold) + + sampled := threshold.ShouldSample(rnd.randomness()) + + _ = stats.RecordWithTags( + ctx, + []tag.Mutator{tag.Upsert(tagPolicyKey, rnd.policyName()), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, + statCountTracesSampled.M(int64(1)), + ) + + return sampled +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode_test.go b/processor/probabilisticsamplerprocessor/sampler_mode_test.go new file mode 100644 index 000000000000..170da3ed6d44 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/sampler_mode_test.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package probabilisticsamplerprocessor + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestUnmarshalText(t *testing.T) { + tests := []struct { + samplerMode string + shouldError bool + }{ + { + samplerMode: "hash_seed", + }, + { + samplerMode: "", + }, + { + samplerMode: "dunno", + shouldError: true, + }, + } + for _, tt := range tests { + t.Run(tt.samplerMode, func(t *testing.T) { + temp := modeUnset + err := temp.UnmarshalText([]byte(tt.samplerMode)) + if tt.shouldError { + assert.Error(t, err) + return + } + require.NoError(t, err) + assert.Equal(t, temp, SamplerMode(tt.samplerMode)) + }) + } +} diff --git a/processor/probabilisticsamplerprocessor/testdata/config.yaml b/processor/probabilisticsamplerprocessor/testdata/config.yaml index a834def5d98c..0e853f77cbe3 100644 --- a/processor/probabilisticsamplerprocessor/testdata/config.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/config.yaml @@ -11,15 +11,6 @@ processors: # zero, i.e.: no sample. Values greater or equal 100 are treated as # "sample all traces". sampling_percentage: 15.3 - # hash_seed allows one to configure the hashing seed. This is important in - # scenarios where multiple layers of collectors are used to achieve the - # desired sampling rate, eg.: 10% on first layer and 10% on the - # second, resulting in an overall sampling rate of 1% (10% x 10%). - # If all layers use the same seed, all data passing one layer will also pass - # the next one, independent of the configured sampling rate. Having different - # seeds at different layers ensures that sampling rate in each layer work as - # intended. - hash_seed: 22 probabilistic_sampler/logs: # the percentage rate at which logs are going to be sampled. Defaults to @@ -34,6 +25,8 @@ processors: # the next one, independent of the configured sampling rate. Having different # seeds at different layers ensures that sampling rate in each layer work as # intended. + # + # setting the hash_seed != 0 causes hash_seed to be selected by default hash_seed: 22 # attribute_source defines where to look for the attribute in from_attribute. The allowed values are `traceID` or `record`. attribute_source: "record" diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_negative.yaml similarity index 59% rename from processor/probabilisticsamplerprocessor/testdata/invalid.yaml rename to processor/probabilisticsamplerprocessor/testdata/invalid_negative.yaml index ffd9b1e07d16..13f36d8da540 100644 --- a/processor/probabilisticsamplerprocessor/testdata/invalid.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_negative.yaml @@ -3,16 +3,15 @@ receivers: processors: - probabilistic_sampler/logs: + probabilistic_sampler/traces: sampling_percentage: -15.3 - hash_seed: 22 exporters: nop: service: pipelines: - logs: + traces: receivers: [ nop ] - processors: [ probabilistic_sampler/logs ] + processors: [ probabilistic_sampler/traces ] exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 96224e6f5704..cafa25589d20 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -7,14 +7,14 @@ import ( "context" "strconv" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) // samplingPriority has the semantic result of parsing the "sampling.priority" @@ -34,74 +34,76 @@ const ( // equal zero and it is NOT going to be sampled, ie.: it won't be forwarded // by the collector. doNotSampleSpan - - // The constants help translate user friendly percentages to numbers direct used in sampling. - numHashBuckets = 0x4000 // Using a power of 2 to avoid division. - bitMaskHashBuckets = numHashBuckets - 1 - percentageScaleFactor = numHashBuckets / 100.0 ) -type traceSamplerProcessor struct { - scaledSamplingRate uint32 - hashSeed uint32 - logger *zap.Logger +type traceProcessor struct { + sampler dataSampler + failClosed bool + logger *zap.Logger +} + +// tracestateCarrier conveys information about sampled spans between +// the call to parse incoming randomness/threshold and the call to +// decide. +type tracestateCarrier struct { + span ptrace.Span +} + +var _ samplingCarrier = &tracestateCarrier{} + +func newTracestateCarrier(s ptrace.Span) samplingCarrier { + return &tracestateCarrier{ + span: s, + } } -// newTracesProcessor returns a processor.TracesProcessor that will perform head sampling according to the given +// newTracesProcessor returns a processor.TracesProcessor that will +// perform intermediate span sampling according to the given // configuration. func newTracesProcessor(ctx context.Context, set processor.CreateSettings, cfg *Config, nextConsumer consumer.Traces) (processor.Traces, error) { - tsp := &traceSamplerProcessor{ - // Adjust sampling percentage on private so recalculations are avoided. - scaledSamplingRate: uint32(cfg.SamplingPercentage * percentageScaleFactor), - hashSeed: cfg.HashSeed, - logger: set.Logger, + tp := &traceProcessor{ + sampler: makeSampler(cfg), + failClosed: cfg.FailClosed, + logger: set.Logger, } - return processorhelper.NewTracesProcessor( ctx, set, cfg, nextConsumer, - tsp.processTraces, + tp.processTraces, processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { +func (th *neverSampler) randomnessFromSpan(_ ptrace.Span) (randomnessNamer, samplingCarrier, error) { + // We return a fake randomness value, since it will not be used. + // This avoids a consistency check error for missing randomness. + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil +} + +func (th *hashingSampler) randomnessFromSpan(s ptrace.Span) (randomnessNamer, samplingCarrier, error) { + tid := s.TraceID() + tsc := newTracestateCarrier(s) + rnd := newMissingRandomnessMethod() + if !tid.IsEmpty() { + rnd = newTraceIDHashingMethod(randomnessFromBytes(tid[:], th.hashSeed)) + } + return rnd, tsc, nil +} +func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool { ils.Spans().RemoveIf(func(s ptrace.Span) bool { - sp := parseSpanSamplingPriority(s) - if sp == doNotSampleSpan { - // The OpenTelemetry mentions this as a "hint" we take a stronger - // approach and do not sample the span since some may use it to - // remove specific spans from traces. - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "sampling_priority"), tag.Upsert(tagSampledKey, "false")}, - statCountTracesSampled.M(int64(1)), - ) - return true - } - - _ = stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "sampling_priority"), tag.Upsert(tagSampledKey, "true")}, - statCountTracesSampled.M(int64(1)), - ) - - // If one assumes random trace ids hashing may seems avoidable, however, traces can be coming from sources - // with various different criteria to generate trace id and perhaps were already sampled without hashing. - // Hashing here prevents bias due to such systems. - tidBytes := s.TraceID() - sampled := sp == mustSampleSpan || - computeHash(tidBytes[:], tsp.hashSeed)&bitMaskHashBuckets < tsp.scaledSamplingRate - - _ = stats.RecordWithTags( + return !commonShouldSampleLogic( ctx, - []tag.Mutator{tag.Upsert(tagPolicyKey, "trace_id_hash"), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, - statCountTracesSampled.M(int64(1)), + s, + tp.sampler, + tp.failClosed, + tp.sampler.randomnessFromSpan, + tp.priorityFunc, + "traces sampler", + tp.logger, ) - return !sampled }) // Filter out empty ScopeMetrics return ils.Spans().Len() == 0 @@ -115,6 +117,25 @@ func (tsp *traceSamplerProcessor) processTraces(ctx context.Context, td ptrace.T return td, nil } +func (tp *traceProcessor) priorityFunc(s ptrace.Span, rnd randomnessNamer, threshold sampling.Threshold) (randomnessNamer, sampling.Threshold) { + switch parseSpanSamplingPriority(s) { + case doNotSampleSpan: + // OpenTracing mentions this as a "hint". We take a stronger + // approach and do not sample the span since some may use it to + // remove specific spans from traces. + threshold = sampling.NeverSampleThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + case mustSampleSpan: + threshold = sampling.AlwaysSampleThreshold + rnd = newSamplingPriorityMethod(rnd.randomness()) // override policy name + case deferDecision: + // Note that the logs processor has very different logic here, + // but that in tracing the priority can only force to never or + // always. + } + return rnd, threshold +} + // parseSpanSamplingPriority checks if the span has the "sampling.priority" tag to // decide if the span should be sampled or not. The usage of the tag follows the // OpenTracing semantic tags: diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index 2799f55360b7..4166e347e477 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -6,6 +6,7 @@ package probabilisticsamplerprocessor import ( "context" "encoding/hex" + "fmt" "math" "math/rand" "testing" @@ -18,10 +19,35 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor/processortest" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils" ) +// defaultHashSeed is used throughout to ensure that the HashSeed is real +// and does not fall back to proportional-mode sampling due to HashSeed == 0. +const defaultHashSeed = 4312 + +func TestHashBucketsLog2(t *testing.T) { + require.Equal(t, numHashBuckets, 1< tt.acceptableDelta { @@ -176,6 +206,8 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t const testSvcName = "test-svc" for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + tt.cfg.HashSeed = defaultHashSeed + sink := new(consumertest.TracesSink) tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), tt.cfg, sink) if err != nil { @@ -193,6 +225,74 @@ func Test_tracesamplerprocessor_SamplingPercentageRange_MultipleResourceSpans(t } } +func Test_tracessamplerprocessor_MissingRandomness(t *testing.T) { + type test struct { + pct float32 + failClosed bool + sampled bool + } + + for _, tt := range []test{ + // When the TraceID is empty and failClosed==true, the span is not sampled. + {0, true, false}, + {62, true, false}, + {100, true, false}, + + // When the TraceID is empty and failClosed==false, the span is sampled when pct != 0. + {0, false, false}, + {62, false, true}, + {100, false, true}, + } { + t.Run(fmt.Sprint(tt.pct, "_", tt.failClosed), func(t *testing.T) { + + ctx := context.Background() + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetTraceID(pcommon.TraceID{}) // invalid TraceID + span.SetSpanID(pcommon.SpanID{1, 2, 3, 4, 5, 6, 7, 8}) // valid SpanID + span.SetName("testing") + + cfg := &Config{ + SamplingPercentage: tt.pct, + HashSeed: defaultHashSeed, + FailClosed: tt.failClosed, + } + + sink := new(consumertest.TracesSink) + + set := processortest.NewNopCreateSettings() + // Note: there is a debug-level log we are expecting when FailClosed + // causes a drop. + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + tsp, err := newTracesProcessor(ctx, set, cfg, sink) + require.NoError(t, err) + + err = tsp.ConsumeTraces(ctx, traces) + require.NoError(t, err) + + sampledData := sink.AllTraces() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + } + + if tt.pct != 0 { + // pct==0 bypasses the randomness check + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "traces sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") + } else { + require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + } + }) + } +} + // Test_tracesamplerprocessor_SpanSamplingPriority checks if handling of "sampling.priority" is correct. func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { singleSpanWithAttrib := func(key string, attribValue pcommon.Value) ptrace.Traces { @@ -285,8 +385,12 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + sink := new(consumertest.TracesSink) - tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), tt.cfg, sink) + cfg := *tt.cfg + + cfg.HashSeed = defaultHashSeed + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), &cfg, sink) require.NoError(t, err) err = tsp.ConsumeTraces(context.Background(), tt.td) @@ -394,6 +498,11 @@ func getSpanWithAttributes(key string, value pcommon.Value) ptrace.Span { func initSpanWithAttribute(key string, value pcommon.Value, dest ptrace.Span) { dest.SetName("spanName") value.CopyTo(dest.Attributes().PutEmpty(key)) + + // ensure a non-empty trace ID with a deterministic value, one that has + // all zero bits for the w3c randomness portion. this value, if sampled + // with the OTel specification, has R-value 0 and sampled only at 100%. + dest.SetTraceID(pcommon.TraceID{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) } // genRandomTestData generates a slice of ptrace.Traces with the numBatches elements which one with @@ -428,10 +537,40 @@ func genRandomTestData(numBatches, numTracesPerBatch int, serviceName string, re return traceBatches } -// assertSampledData checks for no repeated traceIDs and counts the number of spans on the sampled data for +// assertTraces is a traces consumer.Traces +type assertTraces struct { + *testing.T + testName string + traceIDs map[[16]byte]bool + spanCount int +} + +var _ consumer.Traces = &assertTraces{} + +func (a *assertTraces) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (a *assertTraces) ConsumeTraces(_ context.Context, data ptrace.Traces) error { + tt := [1]ptrace.Traces{ + data, + } + a.onSampledData(tt[:]) + return nil +} + +func newAssertTraces(t *testing.T, name string) *assertTraces { + return &assertTraces{ + T: t, + testName: name, + traceIDs: map[[16]byte]bool{}, + spanCount: 0, + } +} + +// onSampledData checks for no repeated traceIDs and counts the number of spans on the sampled data for // the given service. -func assertSampledData(t *testing.T, sampled []ptrace.Traces, serviceName string) (traceIDs map[[16]byte]bool, spanCount int) { - traceIDs = make(map[[16]byte]bool) +func (a *assertTraces) onSampledData(sampled []ptrace.Traces) { for _, td := range sampled { rspans := td.ResourceSpans() for i := 0; i < rspans.Len(); i++ { @@ -439,23 +578,22 @@ func assertSampledData(t *testing.T, sampled []ptrace.Traces, serviceName string ilss := rspan.ScopeSpans() for j := 0; j < ilss.Len(); j++ { ils := ilss.At(j) - if svcNameAttr, _ := rspan.Resource().Attributes().Get("service.name"); svcNameAttr.Str() != serviceName { + if svcNameAttr, _ := rspan.Resource().Attributes().Get("service.name"); svcNameAttr.Str() != a.testName { continue } for k := 0; k < ils.Spans().Len(); k++ { - spanCount++ + a.spanCount++ span := ils.Spans().At(k) key := span.TraceID() - if traceIDs[key] { - t.Errorf("same traceID used more than once %q", key) + if a.traceIDs[key] { + a.Errorf("same traceID used more than once %q", key) return } - traceIDs[key] = true + a.traceIDs[key] = true } } } } - return } // mustParseTID generates TraceIDs from their hex encoding, for